diff --git a/dashboard/components/DdlGraph.tsx b/dashboard/components/DdlGraph.tsx new file mode 100644 index 0000000000000..f0d30c2710a89 --- /dev/null +++ b/dashboard/components/DdlGraph.tsx @@ -0,0 +1,430 @@ +import { + Button, + Modal, + ModalBody, + ModalCloseButton, + ModalContent, + ModalFooter, + ModalHeader, + ModalOverlay, + theme, + useDisclosure, +} from "@chakra-ui/react" +import { tinycolor } from "@ctrl/tinycolor" +import loadable from "@loadable/component" +import * as d3 from "d3" +import { cloneDeep } from "lodash" +import { Fragment, useCallback, useEffect, useRef, useState } from "react" +import { + Edge, + Enter, + FragmentBox, + FragmentBoxPosition, + Position, + generateFragmentEdges, + layoutItem, +} from "../lib/layout" +import { PlanNodeDatum } from "../pages/fragment_graph" +import { StreamNode } from "../proto/gen/stream_plan" + +const ReactJson = loadable(() => import("react-json-view")) + +type FragmentLayout = { + id: string + layoutRoot: d3.HierarchyPointNode + width: number + height: number + actorIds: string[] +} & Position + +function treeLayoutFlip( + root: d3.HierarchyNode, + { dx, dy }: { dx: number; dy: number } +): d3.HierarchyPointNode { + const tree = d3.tree().nodeSize([dy, dx]) + + // Flip x, y + const treeRoot = tree(root) + + // Flip back x, y + treeRoot.each((d: Position) => ([d.x, d.y] = [d.y, d.x])) + + // LTR -> RTL + treeRoot.each((d: Position) => (d.x = -d.x)) + + return treeRoot +} + +function boundBox( + root: d3.HierarchyPointNode, + { + margin: { top, bottom, left, right }, + }: { margin: { top: number; bottom: number; left: number; right: number } } +): { width: number; height: number } { + let x0 = Infinity + let x1 = -x0 + let y0 = Infinity + let y1 = -y0 + + root.each((d) => (x1 = d.x > x1 ? d.x : x1)) + root.each((d) => (x0 = d.x < x0 ? d.x : x0)) + root.each((d) => (y1 = d.y > y1 ? d.y : y1)) + root.each((d) => (y0 = d.y < y0 ? d.y : y0)) + + x0 -= left + x1 += right + y0 -= top + y1 += bottom + + root.each((d) => (d.x = d.x - x0)) + root.each((d) => (d.y = d.y - y0)) + + return { width: x1 - x0, height: y1 - y0 } +} + +const nodeRadius = 12 +const nodeMarginX = nodeRadius * 6 +const nodeMarginY = nodeRadius * 4 +const fragmentMarginX = nodeRadius * 2 +const fragmentMarginY = nodeRadius * 2 +const fragmentDistanceX = nodeRadius * 2 +const fragmentDistanceY = nodeRadius * 2 + +export default function DdlGraph({ + planNodeDependencies, + fragmentDependency, + selectedFragmentId, + backPressures, +}: { + planNodeDependencies: Map> + fragmentDependency: FragmentBox[] // This is just the layout info. + selectedFragmentId?: string // change to selected ddl id. + backPressures?: Map // relation_id -> relation_id: back pressure rate +}) { + const svgRef = useRef(null) + + const { isOpen, onOpen, onClose } = useDisclosure() + const [currentStreamNode, setCurrentStreamNode] = useState() + + const openPlanNodeDetail = useCallback( + (node: PlanNodeDatum) => { + setCurrentStreamNode(node) + onOpen() + }, + [onOpen, setCurrentStreamNode] + ) + + const planNodeDependencyDagCallback = useCallback(() => { + const deps = cloneDeep(planNodeDependencies) + const fragmentDependencyDag = cloneDeep(fragmentDependency) + + const layoutFragmentResult = new Map() + const includedFragmentIds = new Set() + for (const [fragmentId, fragmentRoot] of deps) { + const layoutRoot = treeLayoutFlip(fragmentRoot, { + dx: nodeMarginX, + dy: nodeMarginY, + }) + let { width, height } = boundBox(layoutRoot, { + margin: { + left: nodeRadius * 4 + fragmentMarginX, + right: nodeRadius * 4 + fragmentMarginX, + top: nodeRadius * 3 + fragmentMarginY, + bottom: nodeRadius * 4 + fragmentMarginY, + }, + }) + layoutFragmentResult.set(fragmentId, { + layoutRoot, + width, + height, + actorIds: fragmentRoot.data.actorIds ?? [], + }) + includedFragmentIds.add(fragmentId) + } + + const fragmentLayout = layoutItem( + fragmentDependencyDag.map(({ width: _1, height: _2, id, ...data }) => { + const { width, height } = layoutFragmentResult.get(id)! + return { width, height, id, ...data } + }), + fragmentDistanceX, + fragmentDistanceY + ) + const fragmentLayoutPosition = new Map() + fragmentLayout.forEach(({ id, x, y }: FragmentBoxPosition) => { + fragmentLayoutPosition.set(id, { x, y }) + }) + + const layoutResult: FragmentLayout[] = [] + for (const [fragmentId, result] of layoutFragmentResult) { + const { x, y } = fragmentLayoutPosition.get(fragmentId)! + layoutResult.push({ id: fragmentId, x, y, ...result }) + } + + let svgWidth = 0 + let svgHeight = 0 + layoutResult.forEach(({ x, y, width, height }) => { + svgHeight = Math.max(svgHeight, y + height + 50) + svgWidth = Math.max(svgWidth, x + width) + }) + const edges = generateFragmentEdges(fragmentLayout) + + return { + layoutResult, + svgWidth, + svgHeight, + edges, + } + }, [planNodeDependencies, fragmentDependency]) + + const { + svgWidth, + svgHeight, + edges: fragmentEdgeLayout, + layoutResult: fragmentLayout, + } = planNodeDependencyDagCallback() + + useEffect(() => { + if (fragmentLayout) { + const svgNode = svgRef.current + const svgSelection = d3.select(svgNode) + + const isSelected = (id: string) => id === selectedFragmentId + + // Fragments + const applyFragment = (gSel: FragmentSelection) => { + gSel.attr("transform", ({ x, y }) => `translate(${x}, ${y})`) + + // Fragment text line 1 (fragment id) + let text = gSel.select(".text-frag-id") + if (text.empty()) { + text = gSel.append("text").attr("class", "text-frag-id") + } + + text + .attr("fill", "black") + .text(({ id }) => `Fragment ${id}`) + .attr("font-family", "inherit") + .attr("text-anchor", "end") + .attr("dy", ({ height }) => height - fragmentMarginY + 12) + .attr("dx", ({ width }) => width - fragmentMarginX) + .attr("fill", "black") + .attr("font-size", 12) + + // Fragment text line 2 (actor ids) + let text2 = gSel.select(".text-actor-id") + if (text2.empty()) { + text2 = gSel.append("text").attr("class", "text-actor-id") + } + + text2 + .attr("fill", "black") + .text(({ actorIds }) => `Actor ${actorIds.join(", ")}`) + .attr("font-family", "inherit") + .attr("text-anchor", "end") + .attr("dy", ({ height }) => height - fragmentMarginY + 24) + .attr("dx", ({ width }) => width - fragmentMarginX) + .attr("fill", "black") + .attr("font-size", 12) + + // Fragment bounding box + let boundingBox = gSel.select(".bounding-box") + if (boundingBox.empty()) { + boundingBox = gSel.append("rect").attr("class", "bounding-box") + } + + boundingBox + .attr("width", ({ width }) => width - fragmentMarginX * 2) + .attr("height", ({ height }) => height - fragmentMarginY * 2) + .attr("x", fragmentMarginX) + .attr("y", fragmentMarginY) + .attr("fill", "white") + .attr("stroke-width", ({ id }) => (isSelected(id) ? 3 : 1)) + .attr("rx", 5) + .attr("stroke", ({ id }) => + isSelected(id) ? theme.colors.blue[500] : theme.colors.gray[500] + ) + } + + const createFragment = (sel: Enter) => + sel.append("g").attr("class", "fragment").call(applyFragment) + + const fragmentSelection = svgSelection + .select(".fragments") + .selectAll(".fragment") + .data(fragmentLayout) + type FragmentSelection = typeof fragmentSelection + + fragmentSelection.enter().call(createFragment) + fragmentSelection.call(applyFragment) + fragmentSelection.exit().remove() + + // Fragment Edges + const edgeSelection = svgSelection + .select(".fragment-edges") + .selectAll(".fragment-edge") + .data(fragmentEdgeLayout) + type EdgeSelection = typeof edgeSelection + + const curveStyle = d3.curveMonotoneX + + const line = d3 + .line() + .curve(curveStyle) + .x(({ x }) => x) + .y(({ y }) => y) + + const applyEdge = (gSel: EdgeSelection) => { + // Edge line + let path = gSel.select("path") + if (path.empty()) { + path = gSel.append("path") + } + + const isEdgeSelected = (d: Edge) => + isSelected(d.source) || isSelected(d.target) + + const color = (d: Edge) => { + if (backPressures) { + let value = backPressures.get(`${d.target}_${d.source}`) + if (value) { + return backPressureColor(value) + } + } + + return isEdgeSelected(d) + ? theme.colors.blue["500"] + : theme.colors.gray["300"] + } + + const width = (d: Edge) => { + if (backPressures) { + let value = backPressures.get(`${d.target}_${d.source}`) + if (value) { + return backPressureWidth(value) + } + } + + return isEdgeSelected(d) ? 4 : 2 + } + + path + .attr("d", ({ points }) => line(points)) + .attr("fill", "none") + .attr("stroke-width", width) + .attr("stroke", color) + + // Tooltip for back pressure rate + let title = gSel.select("title") + if (title.empty()) { + title = gSel.append("title") + } + + const text = (d: Edge) => { + if (backPressures) { + let value = backPressures.get(`${d.target}_${d.source}`) + if (value) { + return `${value.toFixed(2)}%` + } + } + + return "" + } + + title.text(text) + + return gSel + } + const createEdge = (sel: Enter) => + sel.append("g").attr("class", "fragment-edge").call(applyEdge) + + edgeSelection.enter().call(createEdge) + edgeSelection.call(applyEdge) + edgeSelection.exit().remove() + } + }, [ + fragmentLayout, + fragmentEdgeLayout, + backPressures, + selectedFragmentId, + openPlanNodeDetail, + ]) + + return ( + + + + + + {currentStreamNode?.operatorId} - {currentStreamNode?.name} + + + + {isOpen && currentStreamNode?.node && ( + + name === "input" || name === "fields" || name === "streamKey" + } // collapse top-level fields for better readability + src={currentStreamNode.node} + collapsed={3} + name={null} + displayDataTypes={false} + /> + )} + + + + + + + + + + + + + ) +} + +/** + * The color for the edge with given back pressure value. + * + * @param value The back pressure rate, between 0 and 100. + */ +function backPressureColor(value: number) { + const colorRange = [ + theme.colors.green["100"], + theme.colors.green["300"], + theme.colors.yellow["400"], + theme.colors.orange["500"], + theme.colors.red["700"], + ].map((c) => tinycolor(c)) + + value = Math.max(value, 0) + value = Math.min(value, 100) + + const step = colorRange.length - 1 + const pos = (value / 100) * step + const floor = Math.floor(pos) + const ceil = Math.ceil(pos) + + const color = tinycolor(colorRange[floor]) + .mix(tinycolor(colorRange[ceil]), (pos - floor) * 100) + .toHexString() + + return color +} + +/** + * The width for the edge with given back pressure value. + * + * @param value The back pressure rate, between 0 and 100. + */ +function backPressureWidth(value: number) { + value = Math.max(value, 0) + value = Math.min(value, 100) + + return 30 * (value / 100) + 2 +} diff --git a/dashboard/components/Layout.tsx b/dashboard/components/Layout.tsx index 6daa7e821ce3c..e229ca13a2f01 100644 --- a/dashboard/components/Layout.tsx +++ b/dashboard/components/Layout.tsx @@ -146,6 +146,7 @@ function Layout({ children }: { children: React.ReactNode }) { Streaming Dependency Graph Fragment Graph + Ddl Graph
Batch diff --git a/dashboard/lib/layout.ts b/dashboard/lib/layout.ts index ca96325e9802f..421157eaadac8 100644 --- a/dashboard/lib/layout.ts +++ b/dashboard/lib/layout.ts @@ -288,6 +288,7 @@ export interface LayoutItemBase { export type FragmentBox = LayoutItemBase & { name: string + // Upstream Fragment Ids. externalParentIds: string[] fragment?: TableFragments_Fragment } diff --git a/dashboard/pages/ddl_graph.tsx b/dashboard/pages/ddl_graph.tsx new file mode 100644 index 0000000000000..d28f00777be6b --- /dev/null +++ b/dashboard/pages/ddl_graph.tsx @@ -0,0 +1,528 @@ +/* + * Copyright 2024 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { + Box, + Button, + Flex, + FormControl, + FormLabel, + HStack, + Input, + Select, + Text, + VStack, +} from "@chakra-ui/react" +import * as d3 from "d3" +import { dagStratify } from "d3-dag" +import _ from "lodash" +import Head from "next/head" +import { parseAsInteger, useQueryState } from "nuqs" +import { Fragment, useCallback, useEffect, useMemo, useState } from "react" +import FragmentDependencyGraph from "../components/FragmentDependencyGraph" +import DdlGraph from "../components/DdlGraph" +import Title from "../components/Title" +import useErrorToast from "../hook/useErrorToast" +import useFetch from "../lib/api/fetch" +import { + calculateBPRate, + calculateCumulativeBp, + fetchEmbeddedBackPressure, + fetchPrometheusBackPressure, +} from "../lib/api/metric" +import { + getFragmentsByJobId, getRelationDependencies, + getRelationIdInfos, + getStreamingJobs, +} from "../lib/api/streaming" +import { FragmentBox } from "../lib/layout" +import { TableFragments, TableFragments_Fragment } from "../proto/gen/meta" +import { BackPressureInfo } from "../proto/gen/monitor_service" +import { Dispatcher, MergeNode, StreamNode } from "../proto/gen/stream_plan" + +interface DispatcherNode { + [actorId: number]: Dispatcher[] +} + +// Refresh interval (ms) for back pressure stats +const INTERVAL_MS = 5000 + +/** Associated data of each plan node in the fragment graph, including the dispatchers. */ +export interface PlanNodeDatum { + name: string + children?: PlanNodeDatum[] + operatorId: string | number + node: StreamNode | DispatcherNode + actorIds?: string[] +} + +function buildPlanNodeDependency( + fragment: TableFragments_Fragment +): d3.HierarchyNode { + const firstActor = fragment.actors[0] + + const hierarchyActorNode = (node: StreamNode): PlanNodeDatum => { + return { + name: node.nodeBody?.$case?.toString() || "unknown", + children: (node.input || []).map(hierarchyActorNode), + operatorId: node.operatorId, + node, + } + } + + let dispatcherName: string + + if (firstActor.dispatcher.length > 0) { + const firstDispatcherName = _.camelCase( + firstActor.dispatcher[0].type.replace(/^DISPATCHER_TYPE_/, "") + ) + if (firstActor.dispatcher.length > 1) { + if ( + firstActor.dispatcher.every( + (d) => d.type === firstActor.dispatcher[0].type + ) + ) { + dispatcherName = `${firstDispatcherName}Dispatchers` + } else { + dispatcherName = "multipleDispatchers" + } + } else { + dispatcherName = `${firstDispatcherName}Dispatcher` + } + } else { + dispatcherName = "noDispatcher" + } + + const dispatcherNode = fragment.actors.reduce((obj, actor) => { + obj[actor.actorId] = actor.dispatcher + return obj + }, {} as DispatcherNode) + + return d3.hierarchy({ + name: dispatcherName, + actorIds: fragment.actors.map((a) => a.actorId.toString()), + children: firstActor.nodes ? [hierarchyActorNode(firstActor.nodes)] : [], + operatorId: "dispatcher", + node: dispatcherNode, + }) +} + +function findMergeNodes(root: StreamNode): MergeNode[] { + let mergeNodes = new Set() + + const findMergeNodesRecursive = (node: StreamNode) => { + if (node.nodeBody?.$case === "merge") { + mergeNodes.add(node.nodeBody.merge) + } + for (const child of node.input || []) { + findMergeNodesRecursive(child) + } + } + + findMergeNodesRecursive(root) + return Array.from(mergeNodes) +} + +function buildFragmentDependencyAsEdges( + fragments: TableFragments +): FragmentBox[] { + const nodes: FragmentBox[] = [] + const actorToFragmentMapping = new Map() + for (const fragmentId in fragments.fragments) { + const fragment = fragments.fragments[fragmentId] + for (const actor of fragment.actors) { + actorToFragmentMapping.set(actor.actorId, actor.fragmentId) + } + } + for (const id in fragments.fragments) { + const fragment = fragments.fragments[id] + const parentIds = new Set() + const externalParentIds = new Set() + + for (const actor of fragment.actors) { + for (const upstreamActorId of actor.upstreamActorId) { + const upstreamFragmentId = actorToFragmentMapping.get(upstreamActorId) + if (upstreamFragmentId) { + parentIds.add(upstreamFragmentId) + } else { + for (const m of findMergeNodes(actor.nodes!)) { + externalParentIds.add(m.upstreamFragmentId) + } + } + } + } + nodes.push({ + id: fragment.fragmentId.toString(), + name: `Fragment ${fragment.fragmentId}`, + parentIds: Array.from(parentIds).map((x) => x.toString()), + externalParentIds: Array.from(externalParentIds).map((x) => x.toString()), + width: 0, + height: 0, + order: fragment.fragmentId, + fragment, + }) + } + return nodes +} + +const SIDEBAR_WIDTH = 200 + +type BackPressureDataSource = "Embedded" | "Prometheus" +const backPressureDataSources: BackPressureDataSource[] = [ + "Embedded", + "Prometheus", +] + +// The state of the embedded back pressure metrics. +// The metrics from previous fetch are stored here to calculate the rate. +interface EmbeddedBackPressureInfo { + previous: BackPressureInfo[] + current: BackPressureInfo[] + totalBackpressureNs: BackPressureInfo[] + totalDurationNs: number +} + +export default function Streaming() { + const { response: relationList } = useFetch(getStreamingJobs) + const { response: relationIdInfos } = useFetch(getRelationIdInfos) + // 1. Get the relation dependendencies. + // const { response: relationDeps } = useFetch(getRelationDependencies) + // 2. Get the relation -> input fragment_id mapping. + // 3. Get the relation -> output fragment_id mapping. + // 4. Construct the BP graph for relation ids using 1-3. + + const [relationId, setRelationId] = useQueryState("id", parseAsInteger) + const [selectedFragmentId, setSelectedFragmentId] = useState() + const [tableFragments, setTableFragments] = useState() + + const toast = useErrorToast() + + useEffect(() => { + if (relationId) { + setTableFragments(undefined) + getFragmentsByJobId(relationId).then((tf) => { + setTableFragments(tf) + }) + } + }, [relationId]) + + const fragmentDependencyCallback = useCallback(() => { + if (tableFragments) { + const fragmentDep = buildFragmentDependencyAsEdges(tableFragments) + return { + fragments: tableFragments, + fragmentDep, + fragmentDepDag: dagStratify()(fragmentDep), + } + } + }, [tableFragments]) + + useEffect(() => { + if (relationList) { + if (!relationId) { + if (relationList.length > 0) { + setRelationId(relationList[0].id) + } + } + } + }, [relationId, relationList, setRelationId]) + + const fragmentDependency = fragmentDependencyCallback()?.fragmentDep + const fragmentDependencyDag = fragmentDependencyCallback()?.fragmentDepDag + const fragments = fragmentDependencyCallback()?.fragments + + const planNodeDependenciesCallback = useCallback(() => { + const fragments_ = fragments?.fragments + if (fragments_) { + const planNodeDependencies = new Map< + string, + d3.HierarchyNode + >() + for (const fragmentId in fragments_) { + const fragment = fragments_[fragmentId] + const dep = buildPlanNodeDependency(fragment) + planNodeDependencies.set(fragmentId, dep) + } + return planNodeDependencies + } + return undefined + }, [fragments?.fragments]) + + const planNodeDependencies = planNodeDependenciesCallback() + + const [searchActorId, setSearchActorId] = useState("") + const [searchFragId, setSearchFragId] = useState("") + + const handleSearchFragment = () => { + const searchFragIdInt = parseInt(searchFragId) + if (relationIdInfos) { + let map = relationIdInfos.map + for (const relationId in map) { + const fragmentIdToRelationId = map[relationId].map + for (const fragmentId in fragmentIdToRelationId) { + if (parseInt(fragmentId) == searchFragIdInt) { + setRelationId(parseInt(relationId)) + setSelectedFragmentId(searchFragIdInt) + return + } + } + } + } + toast(new Error(`Fragment ${searchFragIdInt} not found`)) + } + + const handleSearchActor = () => { + const searchActorIdInt = parseInt(searchActorId) + if (relationIdInfos) { + let map = relationIdInfos.map + for (const relationId in map) { + const fragmentIdToRelationId = map[relationId].map + for (const fragmentId in fragmentIdToRelationId) { + let actorIds = fragmentIdToRelationId[fragmentId].ids + if (actorIds.includes(searchActorIdInt)) { + setRelationId(parseInt(relationId)) + setSelectedFragmentId(parseInt(fragmentId)) + return + } + } + } + } + toast(new Error(`Actor ${searchActorIdInt} not found`)) + } + + const [backPressureDataSource, setBackPressureDataSource] = + useState("Embedded") + + // Periodically fetch Prometheus back-pressure from Meta node + const { response: promethusMetrics } = useFetch( + fetchPrometheusBackPressure, + INTERVAL_MS, + backPressureDataSource === "Prometheus" + ) + + // Periodically fetch embedded back-pressure from Meta node + // Didn't call `useFetch()` because the `setState` way is special. + const [embeddedBackPressureInfo, setEmbeddedBackPressureInfo] = + useState() + useEffect(() => { + if (backPressureDataSource === "Embedded") { + const interval = setInterval(() => { + fetchEmbeddedBackPressure().then( + (newBP) => { + console.log(newBP) + setEmbeddedBackPressureInfo((prev) => + prev + ? { + previous: prev.current, + current: newBP, + totalBackpressureNs: calculateCumulativeBp( + prev.totalBackpressureNs, + prev.current, + newBP + ), + totalDurationNs: + prev.totalDurationNs + INTERVAL_MS * 1000 * 1000, + } + : { + previous: newBP, // Use current value to show zero rate, but it's fine + current: newBP, + totalBackpressureNs: [], + totalDurationNs: 0, + } + ) + }, + (e) => { + console.error(e) + toast(e, "error") + } + ) + }, INTERVAL_MS) + return () => { + clearInterval(interval) + } + } + }, [backPressureDataSource, toast]) + + const backPressures = useMemo(() => { + if (promethusMetrics || embeddedBackPressureInfo) { + let map = new Map() + + if (backPressureDataSource === "Embedded" && embeddedBackPressureInfo) { + const metrics = calculateBPRate( + embeddedBackPressureInfo.totalBackpressureNs, + embeddedBackPressureInfo.totalDurationNs + ) + for (const m of metrics.outputBufferBlockingDuration) { + map.set( + `${m.metric.fragmentId}_${m.metric.downstreamFragmentId}`, + m.sample[0].value + ) + } + } else if (backPressureDataSource === "Prometheus" && promethusMetrics) { + for (const m of promethusMetrics.outputBufferBlockingDuration) { + if (m.sample.length > 0) { + // Note: We issue an instant query to Prometheus to get the most recent value. + // So there should be only one sample here. + // + // Due to https://github.com/risingwavelabs/risingwave/issues/15280, it's still + // possible that an old version of meta service returns a range-query result. + // So we take the one with the latest timestamp here. + const value = _(m.sample).maxBy((s) => s.timestamp)!.value * 100 + map.set( + `${m.metric.fragment_id}_${m.metric.downstream_fragment_id}`, + value + ) + } + } + } + return map + } + }, [backPressureDataSource, promethusMetrics, embeddedBackPressureInfo]) + + const retVal = ( + + Fragment Graph + + + + Relations + { + const id = relationList?.find( + (x) => x.name == event.target.value + )?.id + if (id) { + setRelationId(id) + } + }} + placeholder="Search..." + mb={2} + > + + {relationList && + relationList.map((r) => ( + + ))} + + + + + Goto + + + setSearchFragId(event.target.value)} + > + + + + setSearchActorId(event.target.value)} + > + + + + + + Back Pressure Data Source + + + + Fragments + {fragmentDependencyDag && ( + + + setSelectedFragmentId(parseInt(id)) + } + selectedId={selectedFragmentId?.toString()} + /> + + )} + + + + Fragment Graph + {planNodeDependencies && fragmentDependency && ( + + )} + + + + ) + + return ( + + + Streaming Fragments + + {retVal} + + ) +} diff --git a/dashboard/pages/fragment_graph.tsx b/dashboard/pages/fragment_graph.tsx index 42098ff222858..23b4f04119c07 100644 --- a/dashboard/pages/fragment_graph.tsx +++ b/dashboard/pages/fragment_graph.tsx @@ -236,6 +236,7 @@ export default function Streaming() { } }, [relationId, relationList, setRelationId]) + // The table fragments of the selected fragment id const fragmentDependency = fragmentDependencyCallback()?.fragmentDep const fragmentDependencyDag = fragmentDependencyCallback()?.fragmentDepDag const fragments = fragmentDependencyCallback()?.fragments