Skip to content

Commit

Permalink
cdf prototype succeeded
Browse files Browse the repository at this point in the history
  • Loading branch information
linzhou-db committed Apr 6, 2023
1 parent 13ccb48 commit 5828d79
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1799,7 +1799,7 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {
}

integrationTest("linzhou_cdf_prototype") {
val response = readNDJson(requestPath("/shares/share8/schemas/default/tables/cdf_table_cdf_enabled/changes?startingVersion=0&endingVersion=1"), Some("GET"), None, Some(0))
val response = readNDJson(requestPath("/shares/share8/schemas/default/tables/cdf_table_cdf_enabled/changes?startingVersion=0&endingVersion=2"), Some("GET"), None, Some(0))
val lines = response.split("\n")
val protocol = lines(0)
val metadata = lines(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ private[sharing] class DeltaSharingLogFileSystem extends FileSystem {
{"add":{"path":"delta-sharing:///share8.default.cdf_table_cdf_enabled/d7ed708546dd70fdff9191b3e3d6448b/1030","partitionValues":{},"size":1030,"modificationTime":1651272634000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"name\":\"1\",\"age\":1,\"birthday\":\"2020-01-01\"},\"maxValues\":{\"name\":\"1\",\"age\":1,\"birthday\":\"2020-01-01\"},\"nullCount\":{\"name\":0,\"age\":0,\"birthday\":0}}","tags":{"INSERTION_TIME":"1651272634000000","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"add":{"path":"delta-sharing:///share8.default.cdf_table_cdf_enabled/60d0cf57f3e4367db154aa2c36152a1f/1030","partitionValues":{},"size":1030,"modificationTime":1651272635000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"name\":\"2\",\"age\":2,\"birthday\":\"2020-01-01\"},\"maxValues\":{\"name\":\"2\",\"age\":2,\"birthday\":\"2020-01-01\"},\"nullCount\":{\"name\":0,\"age\":0,\"birthday\":0}}","tags":{"INSERTION_TIME":"1651272634000001","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"add":{"path":"delta-sharing:///share8.default.cdf_table_cdf_enabled/a6dc5694a4ebcc9a067b19c348526ad6/1030","partitionValues":{},"size":1030,"modificationTime":1651272634000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"name\":\"3\",\"age\":3,\"birthday\":\"2020-01-01\"},\"maxValues\":{\"name\":\"3\",\"age\":3,\"birthday\":\"2020-01-01\"},\"nullCount\":{\"name\":0,\"age\":0,\"birthday\":0}}","tags":{"INSERTION_TIME":"1651272634000002","OPTIMIZE_TARGET_SIZE":"268435456"}}}""".stripMargin
val cdfJson2 = """{"cdc":{"path":"delta-sharing:///share8.default.cdf_table_cdf_enabled/6521ba910108d4b54d27beaa9fc2373f/1301","partitionValues":{},"size":1301,"dataChange":false}}
{"commitInfo":{"timestamp":1651272654866,"userId":"7953272455820895","userName":"[email protected]","operation":"DELETE","operationParameters":{"predicate":"[\"(spark_catalog.delta.`s3://AKIA2JMHUIXTTBXLCSB2:o4bZ+L6Oo8XT5j0oGoGzG4Jr6waNeXpHufPbtGrM@delta-exchange-test/delta-exchange-test/cdf_table_cdf_enabled`.age = 3)\"]"},"notebook":{"notebookId":"3173513222201325"},"clusterId":"0819-204509-hill72","readVersion":1,"isolationLevel":"WriteSerializable","isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"1","numCopiedRows":"1","numAddedChangeFiles":"1","executionTimeMs":"1536","numDeletedRows":"1","scanTimeMs":"916","numAddedFiles":"0","rewriteTimeMs":"618"},"engineInfo":"Databricks-Runtime/11.x-snapshot-aarch64-scala2.12","txnId":"336afaea-72f4-46a4-9c69-809a90b3bdbd"}}""".stripMargin
val cdfJson3 = """{"cdc":{"path":"delta-sharing:///share8.default.cdf_table_cdf_enabled/2508998dce55bd726369e53761c4bc3f/1416","partitionValues":{},"size":1416,"dataChange":false}}
{"commitInfo":{"timestamp":1651272659127,"userId":"7953272455820895","userName":"[email protected]","operation":"UPDATE","operationParameters":{"predicate":"(age#15119 = 2)"},"notebook":{"notebookId":"3173513222201325"},"clusterId":"0819-204509-hill72","readVersion":2,"isolationLevel":"WriteSerializable","isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"1","numCopiedRows":"0","numAddedChangeFiles":"1","executionTimeMs":"1222","scanTimeMs":"99","numAddedFiles":"1","numUpdatedRows":"1","rewriteTimeMs":"1119"},"engineInfo":"Databricks-Runtime/11.x-snapshot-aarch64-scala2.12","txnId":"4c39d77a-4105-4df9-aef8-2353411dd622"}}""".stripMargin
// scalastyle:on
override def open(f: Path, bufferSize: Int): FSDataInputStream = {
// scalastyle:off println
Expand All @@ -121,6 +125,16 @@ private[sharing] class DeltaSharingLogFileSystem extends FileSystem {
Console.println(s"----[linzhou]----returning cdf 1.json:${cdfJson1.length}")
return new FSDataInputStream(new SeekableByteArrayInputStream(
cdfJson1.getBytes(), "cdf_1.json"))
} else if (f.toString ==
"delta-sharing-log:/cdf_table_cdf_enabled/_delta_log/00000000000000000002.json") {
Console.println(s"----[linzhou]----returning cdf 2.json:${cdfJson2.length}")
return new FSDataInputStream(new SeekableByteArrayInputStream(
cdfJson2.getBytes(), "cdf_2.json"))
} else if (f.toString ==
"delta-sharing-log:/cdf_table_cdf_enabled/_delta_log/00000000000000000003.json") {
Console.println(s"----[linzhou]----returning cdf 3.json:${cdfJson3.length}")
return new FSDataInputStream(new SeekableByteArrayInputStream(
cdfJson3.getBytes(), "cdf_3.json"))
} else if (f.toString ==
"delta-sharing-log:/cdf_table_cdf_enabled/_delta_log/00000000000000000001.crc") {
Console.println(s"----[linzhou]----throwing exception for 1.crc")
Expand Down Expand Up @@ -189,7 +203,11 @@ private[sharing] class DeltaSharingLogFileSystem extends FileSystem {
new FileStatus(0, false, 0, 1, 0, new Path(
"delta-sharing-log:/cdf_table_cdf_enabled/_delta_log/00000000000000000000.json")),
new FileStatus(cdfJson1.length, false, 0, 1, 1651272635000L, new Path(
"delta-sharing-log:/cdf_table_cdf_enabled/_delta_log/00000000000000000001.json"))
"delta-sharing-log:/cdf_table_cdf_enabled/_delta_log/00000000000000000001.json")),
new FileStatus(cdfJson2.length, false, 0, 1, 1651272655000L, new Path(
"delta-sharing-log:/cdf_table_cdf_enabled/_delta_log/00000000000000000002.json")),
new FileStatus(cdfJson3.length, false, 0, 1, 1651272660000L, new Path(
"delta-sharing-log:/cdf_table_cdf_enabled/_delta_log/00000000000000000003.json"))
)
Console.println(s"----[linzhou]----listing:${a}")
return a
Expand Down
49 changes: 36 additions & 13 deletions spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{DateType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.types.{DateType, IntegerType, LongType, StringType, StructField, StructType, TimestampType}

import io.delta.sharing.spark.TestUtils._

Expand Down Expand Up @@ -93,6 +93,7 @@ class DeltaSharingSuite extends QueryTest with SharedSparkSession with DeltaShar
StructField("eventTime", TimestampType),
StructField("date", DateType),
StructField("type", StringType).withComment("this is a comment")))

assert(spark.read.format("deltaSharing").load(tablePath).schema == expectedSchema)
withTable("delta_sharing_test") {
sql(s"CREATE TABLE delta_sharing_test USING deltaSharing LOCATION '$tablePath'")
Expand Down Expand Up @@ -559,29 +560,51 @@ class DeltaSharingSuite extends QueryTest with SharedSparkSession with DeltaShar
val expected = Seq(
Row("1", 1, sqlDate("2020-01-01"), 1L, 1651272635000L, "insert"),
Row("2", 2, sqlDate("2020-01-01"), 1L, 1651272635000L, "insert"),
Row("3", 3, sqlDate("2020-01-01"), 1L, 1651272635000L, "insert")
// Row("2", 2, sqlDate("2020-01-01"), 3L, 1651272660000L, "update_preimage"),
// Row("2", 2, sqlDate("2020-02-02"), 3L, 1651272660000L, "update_postimage"),
// Row("3", 3, sqlDate("2020-01-01"), 2L, 1651272655000L, "delete")
Row("3", 3, sqlDate("2020-01-01"), 1L, 1651272635000L, "insert"),
Row("2", 2, sqlDate("2020-01-01"), 3L, 1651272660000L, "update_preimage"),
Row("2", 2, sqlDate("2020-02-02"), 3L, 1651272660000L, "update_postimage"),
Row("3", 3, sqlDate("2020-01-01"), 2L, 1651272655000L, "delete")
)
val result = spark.read.format("deltaSharing")
.option("readChangeFeed", "true")
.option("startingVersion", 1)
.option("endingVersion", 1).load(tablePath)
.option("endingVersion", 3).load(tablePath)
checkAnswer(result, expected)

// Console.println(s"----[linzhou]-------------[Test DeltaLog CDF]------")
// checkAnswer(spark.read.format("deltaSharing").load(
// "delta-sharing-log:///cdf_table_cdf_enabled"), expected)

Console.println(s"----[linzhou]-------------[Test CDF]------")
if (true) {
Console.println(s"----[linzhou]-------------[Test CDF]------")

val df = spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 1)
.option("endingVersion", 1)
.load("delta-sharing-log:///cdf_table_cdf_enabled")
checkAnswer(df, expected)
val df = spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 1)
.option("endingVersion", 3)
.load("delta-sharing-log:///cdf_table_cdf_enabled")

val expectedRows = Seq(
Row("1", 1, sqlDate("2020-01-01"), "insert", 1L, sqlTimestamp("2022-04-29 15:50:35.0")),
Row("2", 2, sqlDate("2020-01-01"), "insert", 1L, sqlTimestamp("2022-04-29 15:50:35.0")),
Row("3", 3, sqlDate("2020-01-01"), "insert", 1L, sqlTimestamp("2022-04-29 15:50:35.0")),
Row("3", 3, sqlDate("2020-01-01"), "delete", 2L, sqlTimestamp("2022-04-29 15:50:55.0")),
Row("2", 2, sqlDate("2020-01-01"), "update_preimage", 3L, sqlTimestamp("2022-04-29 15:51:00.0")),
Row("2", 2, sqlDate("2020-02-02"), "update_postimage", 3L, sqlTimestamp("2022-04-29 15:51:00.0"))
)
val expectedSchema = StructType(Array(
StructField("name", StringType),
StructField("age", IntegerType),
StructField("birthday", DateType),
StructField("_change_type", StringType),
StructField("_commit_version", LongType),
StructField("_commit_timestamp", TimestampType)
))
import scala.collection.JavaConversions._
var expectedDF = spark.createDataFrame(expectedRows,expectedSchema)

checkAnswer(expectedDF, df)
}
// // should work when selecting some columns in a different order
// checkAnswer(
// result.select("_change_type", "birthday", "age"),
Expand Down

0 comments on commit 5828d79

Please sign in to comment.