From 7e96d993355571c3fd37b39d888cafe26447424b Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Wed, 13 Sep 2023 12:09:46 +0200 Subject: [PATCH] Add option to save statistics only --- DIFF.md | 1 + .../uk/co/gresearch/spark/diff/App.scala | 10 +++++-- .../uk/co/gresearch/spark/diff/AppSuite.scala | 28 +++++++++++++++++++ 3 files changed, 37 insertions(+), 2 deletions(-) diff --git a/DIFF.md b/DIFF.md index 23ea6258..81162581 100644 --- a/DIFF.md +++ b/DIFF.md @@ -458,6 +458,7 @@ Input and output --ignore ignore column name --save-mode save mode for writing output (Append, Overwrite, ErrorIfExists, Ignore, default ErrorIfExists) --filter Filters for rows with these diff actions, with default diffing options use 'N', 'I', 'D', or 'C' (see 'Diffing options' section) + --statistics Only output statistics on how many rows exist per diff action (see 'Diffing options' section) Diffing options --diff-column column name for diff column (default 'diff') diff --git a/src/main/scala/uk/co/gresearch/spark/diff/App.scala b/src/main/scala/uk/co/gresearch/spark/diff/App.scala index bf5fc0f3..2f5749ee 100644 --- a/src/main/scala/uk/co/gresearch/spark/diff/App.scala +++ b/src/main/scala/uk/co/gresearch/spark/diff/App.scala @@ -46,6 +46,7 @@ object App { ignore: Seq[String] = Seq.empty, saveMode: SaveMode = SaveMode.ErrorIfExists, filter: Set[String] = Set.empty, + statistics: Boolean = false, diffOptions: DiffOptions = DiffOptions.default) // read options from args @@ -179,6 +180,10 @@ object App { .valueName("") .action((x, c) => c.copy(filter = c.filter + x)) .text(s"Filters for rows with these diff actions, with default diffing options use 'N', 'I', 'D', or 'C' (see 'Diffing options' section)") + opt[Unit]("statistics") + .optional() + .action((_, c) => c.copy(statistics = true)) + .text(s"Only output statistics on how many rows exist per diff action (see 'Diffing options' section)") note("") note("Diffing options") @@ -244,8 +249,9 @@ object App { .when(schema.isDefined).call(_.schema(schema.get)) .when(format.isDefined).either(_.load(path)).or(_.table(path)) - def write(df: DataFrame, format: Option[String], path: String, options: Map[String, String], saveMode: SaveMode, filter: Set[String], diffOptions: DiffOptions): Unit = + def write(df: DataFrame, format: Option[String], path: String, options: Map[String, String], saveMode: SaveMode, filter: Set[String], saveStats: Boolean, diffOptions: DiffOptions): Unit = df.when(filter.nonEmpty).call(_.where(col(diffOptions.diffColumn).isInCollection(filter))) + .when(saveStats).call(_.groupBy(diffOptions.diffColumn).count) .write .when(format.isDefined).call(_.format(format.get)) .options(options) @@ -270,6 +276,6 @@ object App { val left = read(spark, options.leftFormat, options.leftPath.get, options.leftSchema, options.leftOptions) val right = read(spark, options.rightFormat, options.rightPath.get, options.rightSchema, options.rightOptions) val diff = left.diff(right, options.diffOptions, options.ids, options.ignore) - write(diff, options.outputFormat, options.outputPath.get, options.outputOptions, options.saveMode, options.filter, options.diffOptions) + write(diff, options.outputFormat, options.outputPath.get, options.outputOptions, options.saveMode, options.filter, options.statistics, options.diffOptions) } } diff --git a/src/test/scala/uk/co/gresearch/spark/diff/AppSuite.scala b/src/test/scala/uk/co/gresearch/spark/diff/AppSuite.scala index 5be0dd3d..6a0f52d9 100644 --- a/src/test/scala/uk/co/gresearch/spark/diff/AppSuite.scala +++ b/src/test/scala/uk/co/gresearch/spark/diff/AppSuite.scala @@ -83,4 +83,32 @@ class AppSuite extends AnyFunSuite with SparkTestSession { } } } + + test("run app writing stats") { + withTempPath { path => + // write left dataframe as parquet + val leftPath = new File(path, "left.parquet").getAbsolutePath + DiffSuite.left(spark).write.parquet(leftPath) + + // write right dataframe as csv + val rightPath = new File(path, "right.parquet").getAbsolutePath + DiffSuite.right(spark).write.parquet(rightPath) + + // launch app + val outputPath = new File(path, "diff.parquet").getAbsolutePath + App.main(Array( + "--format", "parquet", + "--statistics", + "--id", "id", + leftPath, + rightPath, + outputPath + )) + + // assert written diff + val actual = spark.read.parquet(outputPath).as[(String, Long)].collect().toMap + val expected = DiffSuite.expectedDiff.groupBy(row => row.getString(0)).view.mapValues(_.length).toMap + assert(actual === expected) + } + } }