Skip to content

Commit

Permalink
Key order sensitive map comparator (#187)
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi authored Aug 23, 2023
1 parent 365e412 commit 2be317c
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 24 deletions.
2 changes: 1 addition & 1 deletion DIFF.md
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ The following alternative comparators are provided:
|`DiffComparators.epsilon(epsilon)`|Two values are equal when they are at most `epsilon` apart.<br/><br/>The comparator can be configured to use `epsilon` as an absolute (`.asAbsolute()`) threshold, or as relative (`.asRelative()`) to the larger value. Further, the threshold itself can be considered equal (`.asInclusive()`) or not equal (`.asExclusive()`):<ul><li>`DiffComparators.epsilon(epsilon).asAbsolute().asInclusive()`:<br/>`x` and `y` are equal iff `abs(x - y) ≤ epsilon`</li><li>`DiffComparators.epsilon(epsilon).asAbsolute().asExclusive()`:<br/>`x` and `y` are equal iff `abs(x - y) < epsilon`</li><li>`DiffComparators.epsilon(epsilon).asRelative().asInclusive()`:<br/>`x` and `y` are equal iff `abs(x - y) ≤ epsilon * max(abs(x), abs(y))`</li><li>`DiffComparators.epsilon(epsilon).asRelative().asExclusive()`:<br/>`x` and `y` are equal iff `abs(x - y) < epsilon * max(abs(x), abs(y))`</li></ul>|
|`DiffComparators.string()`|Two `StringType` values are compared while ignoring white space differences. For this comparison, sequences of whitespaces are collapesed into single whitespaces, leading and trailing whitespaces are removed. With `DiffComparators.string(false)`, string values are compared with the default comparator.|
|`DiffComparators.duration(duration)`|Two `DateType` or `TimestampType` values are equal when they are at most `duration` apart. That duration is an instance of `java.time.Duration`.<br/><br/>The comparator can be configured to consider `duration` as equal (`.asInclusive()`) or not equal (`.asExclusive()`):<ul><li>`DiffComparators.duration(duration).asInclusive()`:<br/>`x` and `y` are equal iff `x - y ≤ duration`</li><li>`DiffComparators.duration(duration).asExclusive()`:<br/>`x` and `y` are equal iff `x - y < duration`</li></lu>|
|`DiffComparators.map[K,V]()`|Two `Map[K,V]` values are equal when they match in all their keys and values.|
|`DiffComparators.map[K,V](keyOrderSensitive)`|Two `Map[K,V]` values are equal when they match in all their keys and values. With `keyOrderSensitive=true`, the order of the keys matters, with `keyOrderSensitive=false` (default), the order of keys is ignored.|

An example:

Expand Down
11 changes: 10 additions & 1 deletion src/main/scala/uk/co/gresearch/spark/diff/DiffComparators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,14 @@ object DiffComparators {
/**
* This comparator compares two `Map[K,V]` values. They are equal when they match in all their keys and values.
*/
def map[K: Encoder, V: Encoder](): DiffComparator = MapDiffComparator[K, V]()
def map[K: Encoder, V: Encoder](): DiffComparator = MapDiffComparator[K, V](keyOrderSensitive = false)

// for backward compatibility to v2.4.0 up to v2.8.0
// replace with default value in above map when moving to v3
/**
* This comparator compares two `Map[K,V]` values. They are equal when they match in all their keys and values.
*
* @param keyOrderSensitive comparator compares key order if true
*/
def map[K: Encoder, V: Encoder](keyOrderSensitive: Boolean): DiffComparator = MapDiffComparator[K, V](keyOrderSensitive)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ case class MapDiffComparator[K, V](private val comparator: EquivDiffComparator[U
override def equiv(left: Column, right: Column): Column = comparator.equiv(left, right)
}

private case class MapDiffEquiv[K: ClassTag, V](keyType: DataType, valueType: DataType) extends math.Equiv[UnsafeMapData] {
private case class MapDiffEquiv[K: ClassTag, V](keyType: DataType, valueType: DataType, keyOrderSensitive: Boolean) extends math.Equiv[UnsafeMapData] {
override def equiv(left: UnsafeMapData, right: UnsafeMapData): Boolean = {

val leftKeysIndices: Map[K, Int] = left.keyArray().toArray(keyType).zipWithIndex.toMap
val rightKeysIndices: Map[K, Int] = right.keyArray().toArray(keyType).zipWithIndex.toMap
val leftKeys: Array[K] = left.keyArray().toArray(keyType)
val rightKeys: Array[K] = right.keyArray().toArray(keyType)

val leftKeysIndices: Map[K, Int] = leftKeys.zipWithIndex.toMap
val rightKeysIndices: Map[K, Int] = rightKeys.zipWithIndex.toMap

val leftValues = left.valueArray()
val rightValues = right.valueArray()
Expand All @@ -46,18 +49,22 @@ private case class MapDiffEquiv[K: ClassTag, V](keyType: DataType, valueType: Da
}

left.numElements() == right.numElements() &&
leftKeysIndices.keySet.diff(rightKeysIndices.keySet).isEmpty &&
(keyOrderSensitive && leftKeys.sameElements(rightKeys) || !keyOrderSensitive && leftKeys.toSet.diff(rightKeys.toSet).isEmpty) &&
valuesAreEqual.forall(identity)
}
}

case object MapDiffComparator {
def apply[K: Encoder, V: Encoder](): MapDiffComparator[K, V] = {
def apply[K: Encoder, V: Encoder](keyOrderSensitive: Boolean): MapDiffComparator[K, V] = {
val keyType = encoderFor[K].schema.fields(0).dataType
val valueType = encoderFor[V].schema.fields(0).dataType
val equiv = MapDiffEquiv(keyType, valueType)
val equiv = MapDiffEquiv(keyType, valueType, keyOrderSensitive)
val dataType = MapType(keyType, valueType)
val comparator = InputTypedEquivDiffComparator[UnsafeMapData](equiv, dataType)
MapDiffComparator[K, V](comparator)
}

// for backward compatibility to v2.4.0 up to v2.8.0
// replace with default value in above apply when moving to v3
def apply[K: Encoder, V: Encoder](): MapDiffComparator[K, V] = apply(keyOrderSensitive = false)
}
34 changes: 18 additions & 16 deletions src/test/scala/uk/co/gresearch/spark/diff/DiffComparatorSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -355,22 +355,24 @@ class DiffComparatorSuite extends AnyFunSuite with SparkTestSession {
}
}

Seq("true", "false").foreach { codegen =>
test(s"map comparator - codegen enabled=$codegen") {
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen,
SQLConf.CODEGEN_FALLBACK.key -> "false"
) {
val options = DiffOptions.default.withComparator(DiffComparators.map[Int, Long](), "map")

val actual = leftMaps.diff(rightMaps, options, "id").orderBy($"id").collect()
val diffs = Seq((1, "N"), (2, "C"), (3, "C"), (4, "D"), (5, "I"), (6, "N"), (7, "C")).toDF("id", "diff")
val expected = leftMaps.withColumnRenamed("map", "left_map")
.join(rightMaps.withColumnRenamed("map", "right_map"), Seq("id"), "fullouter")
.join(diffs, "id")
.select($"diff", $"id", $"left_map", $"right_map")
.orderBy($"id").collect()
assert(actual === expected)
Seq(true, false).foreach { sensitive =>
Seq("true", "false").foreach { codegen =>
test(s"map comparator - keyOrderSensitive=$sensitive - codegen enabled=$codegen") {
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen,
SQLConf.CODEGEN_FALLBACK.key -> "false"
) {
val options = DiffOptions.default.withComparator(DiffComparators.map[Int, Long](sensitive), "map")

val actual = leftMaps.diff(rightMaps, options, "id").orderBy($"id").collect()
val diffs = Seq((1, "N"), (2, "C"), (3, "C"), (4, "D"), (5, "I"), (6, if (sensitive) "C" else "N"), (7, "C")).toDF("id", "diff")
val expected = leftMaps.withColumnRenamed("map", "left_map")
.join(rightMaps.withColumnRenamed("map", "right_map"), Seq("id"), "fullouter")
.join(diffs, "id")
.select($"diff", $"id", $"left_map", $"right_map")
.orderBy($"id").collect()
assert(actual === expected)
}
}
}
}
Expand Down

0 comments on commit 2be317c

Please sign in to comment.