diff --git a/docs/physical-operators/StateStoreSaveExec.md b/docs/physical-operators/StateStoreSaveExec.md index 399d515c7..f44ba52fe 100644 --- a/docs/physical-operators/StateStoreSaveExec.md +++ b/docs/physical-operators/StateStoreSaveExec.md @@ -79,7 +79,7 @@ storeUpdateFunction: (StateStore, Iterator[T]) => Iterator[U] #### Append -`storeUpdateFunction` [drops late rows](StateStoreWriter.md#applyRemovingRowsOlderThanWatermark) (from the input partition) using the [watermarkPredicateForData](WatermarkSupport.md#watermarkPredicateForData). +`storeUpdateFunction` [drops late rows](StateStoreWriter.md#applyRemovingRowsOlderThanWatermark) (from the input partition) using the [watermark predicate for data](WatermarkSupport.md#watermarkPredicateForData). For every (remaining, non-late) row, `storeUpdateFunction` [stores the row](../streaming-aggregation/StreamingAggregationStateManager.md#put) in the input [StateStore](../stateful-stream-processing/StateStore.md) (using the [StreamingAggregationStateManager](#stateManager)) and increments the [number of updated state rows](#numUpdatedStateRows) metric. @@ -96,13 +96,15 @@ For every (remaining, non-late) row, `storeUpdateFunction` [stores the row](../s `storeUpdateFunction` creates a new `NextIterator`: -* When requested for next state value (row), the iterator traverses over the key-value state pairs until the [watermarkPredicateForKeys](WatermarkSupport.md#watermarkPredicateForKeys) holds `true` for the key. +* When requested for next state value (row), the iterator traverses over the key-value aggregate state pairs until the [watermark predicate for keys](WatermarkSupport.md#watermarkPredicateForKeys) holds `true` for a key. - If so, the iterator [removes the key](../streaming-aggregation/StreamingAggregationStateManager.md#remove) from the state store (via the [StreamingAggregationStateManager](#stateManager)) and increments the [numRemovedStateRows](#numRemovedStateRows) and [number of output rows](#numOutputRows) metrics. The value of the key removed is returned as the next element. + If so, the iterator [removes the key](../streaming-aggregation/StreamingAggregationStateManager.md#remove) from the state store (via the [StreamingAggregationStateManager](#stateManager)) and increments the [number of removed state rows](#numRemovedStateRows) and [number of output rows](#numOutputRows) metrics. + + The value of the removed key is returned as the next element. * When requested to close, the iterator updates the [time to remove](#allRemovalsTimeMs) metric (to be the time to process all the state rows) and [time to commit changes](#commitTimeMs) (to be the time taken for the [StreamingAggregationStateManager](#stateManager) to [commit state changes](../streaming-aggregation/StreamingAggregationStateManager.md#commit)). - In the end, the iterator [setStoreMetrics](StateStoreWriter.md#setStoreMetrics) and [setOperatorMetrics](StateStoreWriter.md#setOperatorMetrics). + In the end, the iterator records metrics of the [StateStore](StateStoreWriter.md#setStoreMetrics) and this [stateful operator](StateStoreWriter.md#setOperatorMetrics). #### Complete @@ -177,13 +179,34 @@ Estimated memory used by a [StateStore](../stateful-stream-processing/StateStore ### number of output rows === "Append" - FIXME + Number of aggregates (state rows) that were [removed from a state store](../streaming-aggregation/StreamingAggregationStateManager.md#remove) because the [watermark predicate for keys](WatermarkSupport.md#watermarkPredicateForKeys) held `true` (i.e., the watermark threshold for the keys was reached). + + Equivalent to [number of removed state rows](#numRemovedStateRows). === "Complete" - Number of rows in a [StateStore](../stateful-stream-processing/StateStore.md) (i.e. all [values](../streaming-aggregation/StreamingAggregationStateManager.md#values) in a [StateStore](../stateful-stream-processing/StateStore.md) in the [StreamingAggregationStateManager](#stateManager) that should be equivalent to the [number of total state rows](#numTotalStateRows) metric) + Number of rows in a [StateStore](../stateful-stream-processing/StateStore.md) (i.e., all the [values](../streaming-aggregation/StreamingAggregationStateManager.md#values) in a [StateStore](../stateful-stream-processing/StateStore.md) in the [StreamingAggregationStateManager](#stateManager)) + + Equivalent to the [number of total state rows](#numTotalStateRows) metric === "Update" - Number of rows that the [StreamingAggregationStateManager](#stateManager) was requested to [store in a state store](../streaming-aggregation/StreamingAggregationStateManager.md#put) (that did not expire per the optional [watermarkPredicateForData](WatermarkSupport.md#watermarkPredicateForData) predicate) that is equivalent to the [number of updated state rows](#numUpdatedStateRows) metric) + Number of rows that the [StreamingAggregationStateManager](#stateManager) was requested to [store in a state store](../streaming-aggregation/StreamingAggregationStateManager.md#put) (that did not expire per the optional [watermarkPredicateForData](WatermarkSupport.md#watermarkPredicateForData) predicate) + + Equivalent to the [number of updated state rows](#numUpdatedStateRows) metric + +### number of removed state rows + +=== "Append" + Number of aggregates (state rows) that were [removed from a state store](../streaming-aggregation/StreamingAggregationStateManager.md#remove) because the [watermark predicate for keys](WatermarkSupport.md#watermarkPredicateForKeys) held `true` (i.e., the watermark threshold for the keys was reached). + + Equivalent to [number of output rows](#numOutputRows) metric. + +=== "Complete" + Not used + +=== "Update" + Not used + +See [number of removed state rows](StateStoreWriter.md#numRemovedStateRows) ### number of rows which are dropped by watermark