Skip to content

Commit

Permalink
More StateStoreSaveExec's Metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
jaceklaskowski committed Oct 25, 2022
1 parent 6b53123 commit 9dea0d5
Showing 1 changed file with 30 additions and 7 deletions.
37 changes: 30 additions & 7 deletions docs/physical-operators/StateStoreSaveExec.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ storeUpdateFunction: (StateStore, Iterator[T]) => Iterator[U]

#### <span id="doExecute-storeUpdateFunction-Append"><span id="doExecute-Append"> Append

`storeUpdateFunction` [drops late rows](StateStoreWriter.md#applyRemovingRowsOlderThanWatermark) (from the input partition) using the [watermarkPredicateForData](WatermarkSupport.md#watermarkPredicateForData).
`storeUpdateFunction` [drops late rows](StateStoreWriter.md#applyRemovingRowsOlderThanWatermark) (from the input partition) using the [watermark predicate for data](WatermarkSupport.md#watermarkPredicateForData).

For every (remaining, non-late) row, `storeUpdateFunction` [stores the row](../streaming-aggregation/StreamingAggregationStateManager.md#put) in the input [StateStore](../stateful-stream-processing/StateStore.md) (using the [StreamingAggregationStateManager](#stateManager)) and increments the [number of updated state rows](#numUpdatedStateRows) metric.

Expand All @@ -96,13 +96,15 @@ For every (remaining, non-late) row, `storeUpdateFunction` [stores the row](../s

`storeUpdateFunction` creates a new `NextIterator`:

* When requested for next state value (row), the iterator traverses over the key-value state pairs until the [watermarkPredicateForKeys](WatermarkSupport.md#watermarkPredicateForKeys) holds `true` for the key.
* When requested for next state value (row), the iterator traverses over the key-value aggregate state pairs until the [watermark predicate for keys](WatermarkSupport.md#watermarkPredicateForKeys) holds `true` for a key.

If so, the iterator [removes the key](../streaming-aggregation/StreamingAggregationStateManager.md#remove) from the state store (via the [StreamingAggregationStateManager](#stateManager)) and increments the [numRemovedStateRows](#numRemovedStateRows) and [number of output rows](#numOutputRows) metrics. The value of the key removed is returned as the next element.
If so, the iterator [removes the key](../streaming-aggregation/StreamingAggregationStateManager.md#remove) from the state store (via the [StreamingAggregationStateManager](#stateManager)) and increments the [number of removed state rows](#numRemovedStateRows) and [number of output rows](#numOutputRows) metrics.

The value of the removed key is returned as the next element.

* When requested to close, the iterator updates the [time to remove](#allRemovalsTimeMs) metric (to be the time to process all the state rows) and [time to commit changes](#commitTimeMs) (to be the time taken for the [StreamingAggregationStateManager](#stateManager) to [commit state changes](../streaming-aggregation/StreamingAggregationStateManager.md#commit)).

In the end, the iterator [setStoreMetrics](StateStoreWriter.md#setStoreMetrics) and [setOperatorMetrics](StateStoreWriter.md#setOperatorMetrics).
In the end, the iterator records metrics of the [StateStore](StateStoreWriter.md#setStoreMetrics) and this [stateful operator](StateStoreWriter.md#setOperatorMetrics).

#### <span id="doExecute-storeUpdateFunction-Complete"> Complete

Expand Down Expand Up @@ -177,13 +179,34 @@ Estimated memory used by a [StateStore](../stateful-stream-processing/StateStore
### <span id="numOutputRows"> number of output rows

=== "Append"
FIXME
Number of aggregates (state rows) that were [removed from a state store](../streaming-aggregation/StreamingAggregationStateManager.md#remove) because the [watermark predicate for keys](WatermarkSupport.md#watermarkPredicateForKeys) held `true` (i.e., the watermark threshold for the keys was reached).

Equivalent to [number of removed state rows](#numRemovedStateRows).

=== "Complete"
Number of rows in a [StateStore](../stateful-stream-processing/StateStore.md) (i.e. all [values](../streaming-aggregation/StreamingAggregationStateManager.md#values) in a [StateStore](../stateful-stream-processing/StateStore.md) in the [StreamingAggregationStateManager](#stateManager) that should be equivalent to the [number of total state rows](#numTotalStateRows) metric)
Number of rows in a [StateStore](../stateful-stream-processing/StateStore.md) (i.e., all the [values](../streaming-aggregation/StreamingAggregationStateManager.md#values) in a [StateStore](../stateful-stream-processing/StateStore.md) in the [StreamingAggregationStateManager](#stateManager))

Equivalent to the [number of total state rows](#numTotalStateRows) metric

=== "Update"
Number of rows that the [StreamingAggregationStateManager](#stateManager) was requested to [store in a state store](../streaming-aggregation/StreamingAggregationStateManager.md#put) (that did not expire per the optional [watermarkPredicateForData](WatermarkSupport.md#watermarkPredicateForData) predicate) that is equivalent to the [number of updated state rows](#numUpdatedStateRows) metric)
Number of rows that the [StreamingAggregationStateManager](#stateManager) was requested to [store in a state store](../streaming-aggregation/StreamingAggregationStateManager.md#put) (that did not expire per the optional [watermarkPredicateForData](WatermarkSupport.md#watermarkPredicateForData) predicate)

Equivalent to the [number of updated state rows](#numUpdatedStateRows) metric

### <span id="numRemovedStateRows"> number of removed state rows

=== "Append"
Number of aggregates (state rows) that were [removed from a state store](../streaming-aggregation/StreamingAggregationStateManager.md#remove) because the [watermark predicate for keys](WatermarkSupport.md#watermarkPredicateForKeys) held `true` (i.e., the watermark threshold for the keys was reached).

Equivalent to [number of output rows](#numOutputRows) metric.

=== "Complete"
Not used

=== "Update"
Not used

See [number of removed state rows](StateStoreWriter.md#numRemovedStateRows)

### <span id="numRowsDroppedByWatermark"> number of rows which are dropped by watermark

Expand Down

0 comments on commit 9dea0d5

Please sign in to comment.