Skip to content

Commit

Permalink
Watermark Expression
Browse files Browse the repository at this point in the history
  • Loading branch information
jaceklaskowski committed Oct 25, 2022
1 parent 8c6f1f9 commit 6b53123
Showing 1 changed file with 63 additions and 16 deletions.
79 changes: 63 additions & 16 deletions docs/physical-operators/WatermarkSupport.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,33 @@ Used when:
* [StateStoreSaveExec](StateStoreSaveExec.md)
* [StreamingDeduplicateExec](StreamingDeduplicateExec.md)

## <span id="watermarkExpression"> Watermark Expression

```scala
watermarkExpression: Option[Expression]
```

??? note "Lazy Value"
`watermarkExpression` is a Scala **lazy value** to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.

Learn more in the [Scala Language Specification]({{ scala.spec }}/05-classes-and-objects.html#lazy).

`watermarkExpression` is a `LessThanOrEqual` ([Spark SQL]({{ book.spark_sql }}/expressions/LessThanOrEqual)) expression.

!!! note
Use [withWatermark](../operators/withWatermark.md) operator to specify a watermark expression.

`watermarkExpression` [creates a watermark expression](#watermarkExpression-utility) for the following:

* [child operator](#child)'s output expressions (from the output schema of the [child operator](#child)) with [spark.watermarkDelayMs](../logical-operators/EventTimeWatermark.md#delayKey) metadata key
* [eventTimeWatermark](#eventTimeWatermark)

---

`watermarkExpression` is used when:

* `WatermarkSupport` is requested for the [watermark predicates](#watermark-predicates) (for [keys](#watermarkPredicateForKeys) and [data](#watermarkPredicateForData))

## Watermark Predicates

### <span id="watermarkPredicateForKeys"> For Keys
Expand All @@ -59,7 +86,7 @@ watermarkPredicateForKeys: Option[BasePredicate]

Learn more in the [Scala Language Specification]({{ scala.spec }}/05-classes-and-objects.html#lazy).

`watermarkPredicateForKeys` is a `Predicate` (Spark SQL) based on the [watermarkExpression](#watermarkExpression) for the [key expressions](#keyExpressions) if there is at least one key expression with [spark.watermarkDelayMs](../logical-operators/EventTimeWatermark.md#delayKey) metadata key.
`watermarkPredicateForKeys` is a `BasePredicate` ([Spark SQL]({{ book.spark_sql }}/expressions/BasePredicate)) for the [watermark expression](#watermarkExpression) (if defined) and the [key expressions](#keyExpressions) with [spark.watermarkDelayMs](../logical-operators/EventTimeWatermark.md#delayKey) metadata key.

---

Expand All @@ -79,14 +106,14 @@ watermarkPredicateForData: Option[BasePredicate]

Learn more in the [Scala Language Specification]({{ scala.spec }}/05-classes-and-objects.html#lazy).

`watermarkPredicateForData` is a `Predicate` (Spark SQL) based on the [watermarkExpression](#watermarkExpression) and the output schema of the [child operator](#child) (to match rows older than the event-time watermark).
`watermarkPredicateForData` is a `BasePredicate` ([Spark SQL]({{ book.spark_sql }}/expressions/BasePredicate)) for the [watermark expression](#watermarkExpression) (if defined) and the output schema of the [child operator](#child).

---

`watermarkPredicateForData` is used when:

* `FlatMapGroupsWithStateExec` is requested to [processDataWithPartition](FlatMapGroupsWithStateExec.md#processDataWithPartition)
* `StateStoreSaveExec` is [executed](StateStoreSaveExec.md#doExecute)
* `StateStoreSaveExec` is [executed](StateStoreSaveExec.md#doExecute) (with [Append](StateStoreSaveExec.md#doExecute-Append) output mode)
* `SessionWindowStateStoreRestoreExec` is [executed](SessionWindowStateStoreRestoreExec.md#doExecute)
* `SessionWindowStateStoreSaveExec` is [executed](SessionWindowStateStoreSaveExec.md#doExecute)
* `StreamingDeduplicateExec` is [executed](StreamingDeduplicateExec.md#doExecute)
Expand All @@ -110,30 +137,50 @@ removeKeysOlderThanWatermark(
* `StreamingDeduplicateExec` is [executed](StreamingDeduplicateExec.md#doExecute) (just before finishing up)
* `StateStoreSaveExec` is [executed](StateStoreSaveExec.md#doExecute) (just before finishing up in [Update](StateStoreSaveExec.md#doExecute-Update) output mode)

## <span id="watermarkExpression"> Watermark Expression
## <span id="watermarkExpression-utility"> watermarkExpression

```scala
watermarkExpression: Option[Expression]
watermarkExpression(
optionalWatermarkExpression: Option[Expression],
optionalWatermarkMs: Option[Long]): Option[Expression]
```

??? note "Lazy Value"
`watermarkExpression` is a Scala **lazy value** to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
`watermarkExpression` is `None` (undefined) when neither the given `optionalWatermarkExpression` nor the `optionalWatermarkMs` is undefined.

Learn more in the [Scala Language Specification]({{ scala.spec }}/05-classes-and-objects.html#lazy).
In other words, `watermarkExpression` creates an `Expression` ([LessThanOrEqual]({{ book.spark_sql }}/expressions/LessThanOrEqual), precisely) when both `optionalWatermarkExpression` and `optionalWatermarkMs` are specified.

`watermarkExpression` is a Catalyst `Expression` ([Spark SQL]({{ book.spark_sql }}/expressions/Expression)) to find rows older than the [event-time watermark](../watermark/index.md).
`watermarkExpression` takes the watermark attribute from the given `optionalWatermarkExpression`.

!!! note
Use [withWatermark](../operators/withWatermark.md) operator to specify a watermark expression.
For the watermark attribute of `StructType` (which means it is a window), `watermarkExpression` uses the end of the window to `LessThanOrEqual` with the `optionalWatermarkMs`.

`watermarkExpression` [creates a watermark expression](#watermarkExpression-helper) for the following:
For the watermark attribute of non-`StructType`, `watermarkExpression` uses it to `LessThanOrEqual` with the `optionalWatermarkMs`.

* [child operator](#child)'s output expressions (in the output schema of the [child operator](#child)) with [EventTimeWatermark.delayKey](../logical-operators/EventTimeWatermark.md#delayKey) metadata key
* [eventTimeWatermark](#eventTimeWatermark)
??? note "FIXME Demo"

```scala
import org.apache.spark.sql.execution.streaming.WatermarkSupport

import org.apache.spark.sql.functions.window
val w = window(timeColumn = $"time", windowDuration = "5 seconds")

val optionalWatermarkExpression = Some(w.expr)
val optionalWatermarkMs = Some(5L)

WatermarkSupport.watermarkExpression(optionalWatermarkExpression, optionalWatermarkMs)

// FIXME Resolve Catalyst expressions
// org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object
// at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:137)
// at org.apache.spark.sql.catalyst.expressions.TimeWindow.dataType(TimeWindow.scala:101)
// at org.apache.spark.sql.catalyst.expressions.Alias.dataType(namedExpressions.scala:166)
// at org.apache.spark.sql.execution.streaming.WatermarkSupport$.watermarkExpression(statefulOperators.scala:276)
// ... 52 elided
```

---

`watermarkExpression` is used when:

* `FlatMapGroupsWithStateExec` is [executed](FlatMapGroupsWithStateExec.md#doExecute) (with `EventTimeTimeout`)
* `WatermarkSupport` is requested for [watermarkPredicateForKeys](#watermarkPredicateForKeys), [watermarkPredicateForData](#watermarkPredicateForData)
* `WatermarkSupport` is requested for the [watermark expression](#watermarkExpression)
* `StreamingSymmetricHashJoinExec.OneSideHashJoiner` is requested to [storeAndJoinWithOtherSide](../streaming-join/OneSideHashJoiner.md#storeAndJoinWithOtherSide)
* `StreamingSymmetricHashJoinHelper` is requested to [getStateWatermarkPredicates](../streaming-join/StreamingSymmetricHashJoinHelper.md#getStateWatermarkPredicates)

0 comments on commit 6b53123

Please sign in to comment.