Skip to content

Commit

Permalink
feat(dashboard): visualize average backpressure rather than spot back…
Browse files Browse the repository at this point in the history
…pressure (#18219) (#18258)

Co-authored-by: Noel Kwan <[email protected]>
  • Loading branch information
github-actions[bot] and kwannoel authored Aug 27, 2024
1 parent 97d1a56 commit 07501ab
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 26 deletions.
6 changes: 4 additions & 2 deletions dashboard/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ For example:

```bash
./risedev d
sqllogictest -p 4566 -d dev './e2e_test/nexmark/create_tables.slt.part'
sqllogictest -p 4566 -d dev './e2e_test/streaming/nexmark/create_views.slt.part'
./risedev slt e2e_test/nexmark/create_sources.slt.part
./risedev psql -c 'CREATE TABLE dimension (v1 int);'
./risedev psql -c 'CREATE MATERIALIZED VIEW mv AS SELECT auction.* FROM dimension join auction on auction.id-auction.id = dimension.v1;'
./risedev psql -c 'INSERT INTO dimension select 0 from generate_series(1, 50);'
```

Install dependencies and start the development server.
Expand Down
60 changes: 42 additions & 18 deletions dashboard/lib/api/metric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,28 +112,52 @@ function convertToBackPressureMetrics(
return bpMetrics
}

export function calculateBPRate(
backPressureNew: BackPressureInfo[],
backPressureOld: BackPressureInfo[],
intervalMs: number
): BackPressuresMetrics {
export function calculateCumulativeBp(
backPressureCumulative: BackPressureInfo[],
backPressureCurrent: BackPressureInfo[],
backPressureNew: BackPressureInfo[]
): BackPressureInfo[] {
let mapCumulative = convertToMapAndAgg(backPressureCumulative)
let mapCurrent = convertToMapAndAgg(backPressureCurrent)
let mapNew = convertToMapAndAgg(backPressureNew)
let mapOld = convertToMapAndAgg(backPressureOld)
let result = new Map<string, number>()
mapNew.forEach((value, key) => {
if (mapOld.has(key)) {
result.set(
key,
// The *100 in end of the formular is to convert the BP rate to the value used in web UI drawing
((value - (mapOld.get(key) || 0)) /
((intervalMs / 1000) * 1000000000)) *
100
)
} else {
result.set(key, 0)
let mapResult = new Map<string, number>()
let keys = new Set([
...mapCumulative.keys(),
...mapCurrent.keys(),
...mapNew.keys(),
])
keys.forEach((key) => {
let backpressureCumulativeValue = mapCumulative.get(key) || 0
let backpressureCurrentValue = mapCurrent.get(key) || 0
let backpressureNewValue = mapNew.get(key) || 0
let increment = backpressureNewValue - backpressureCurrentValue
mapResult.set(key, backpressureCumulativeValue + increment)
})
const result: BackPressureInfo[] = []
mapResult.forEach((value, key) => {
const [fragmentId, downstreamFragmentId] = key.split("-").map(Number)
const backPressureInfo: BackPressureInfo = {
actorId: 0,
fragmentId,
downstreamFragmentId,
value,
}
result.push(backPressureInfo)
})
return result
}

export function calculateBPRate(
backPressureCumulative: BackPressureInfo[],
totalDurationNs: number
): BackPressuresMetrics {
let map = convertToMapAndAgg(backPressureCumulative)
let result = new Map<string, number>()
map.forEach((backpressureNs, key) => {
let backpressureRateRatio = backpressureNs / totalDurationNs
let backpressureRatePercent = backpressureRateRatio * 100
result.set(key, backpressureRatePercent)
})
return convertToBackPressureMetrics(convertFromMapAndAgg(result))
}

Expand Down
23 changes: 17 additions & 6 deletions dashboard/pages/fragment_graph.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import useFetch from "../lib/api/fetch"
import {
BackPressureInfo,
calculateBPRate,
calculateCumulativeBp,
fetchEmbeddedBackPressure,
fetchPrometheusBackPressure,
} from "../lib/api/metric"
Expand All @@ -54,7 +55,7 @@ interface DispatcherNode {
}

// Refresh interval (ms) for back pressure stats
const INTERVAL = 5000
const INTERVAL_MS = 5000

/** Associated data of each plan node in the fragment graph, including the dispatchers. */
export interface PlanNodeDatum {
Expand Down Expand Up @@ -187,6 +188,8 @@ const backPressureDataSources: BackPressureDataSource[] = [
interface EmbeddedBackPressureInfo {
previous: BackPressureInfo[]
current: BackPressureInfo[]
totalBackpressureNs: BackPressureInfo[]
totalDurationNs: number
}

export default function Streaming() {
Expand Down Expand Up @@ -293,7 +296,7 @@ export default function Streaming() {
// Periodically fetch Prometheus back-pressure from Meta node
const { response: promethusMetrics } = useFetch(
fetchPrometheusBackPressure,
INTERVAL,
INTERVAL_MS,
backPressureDataSource === "Prometheus"
)

Expand All @@ -312,10 +315,19 @@ export default function Streaming() {
? {
previous: prev.current,
current: newBP,
totalBackpressureNs: calculateCumulativeBp(
prev.totalBackpressureNs,
prev.current,
newBP
),
totalDurationNs:
prev.totalDurationNs + INTERVAL_MS * 1000 * 1000,
}
: {
previous: newBP, // Use current value to show zero rate, but it's fine
current: newBP,
totalBackpressureNs: [],
totalDurationNs: 0,
}
)
},
Expand All @@ -324,7 +336,7 @@ export default function Streaming() {
toast(e, "error")
}
)
}, INTERVAL)
}, INTERVAL_MS)
return () => {
clearInterval(interval)
}
Expand All @@ -337,9 +349,8 @@ export default function Streaming() {

if (backPressureDataSource === "Embedded" && embeddedBackPressureInfo) {
const metrics = calculateBPRate(
embeddedBackPressureInfo.current,
embeddedBackPressureInfo.previous,
INTERVAL
embeddedBackPressureInfo.totalBackpressureNs,
embeddedBackPressureInfo.totalDurationNs
)
for (const m of metrics.outputBufferBlockingDuration) {
map.set(
Expand Down

0 comments on commit 07501ab

Please sign in to comment.