From a62b4e6f069a6fe1a76fa6fcf08225cda98165a8 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Tue, 17 Sep 2024 11:27:44 +0800 Subject: [PATCH] exclude non-streaming relations from ddl graph --- dashboard/lib/api/streaming.ts | 2 +- dashboard/pages/ddl_graph.tsx | 202 ++++----------------------------- 2 files changed, 23 insertions(+), 181 deletions(-) diff --git a/dashboard/lib/api/streaming.ts b/dashboard/lib/api/streaming.ts index b0e6c8492ffd..f1a0e2b617f8 100644 --- a/dashboard/lib/api/streaming.ts +++ b/dashboard/lib/api/streaming.ts @@ -126,7 +126,7 @@ export async function getRelations() { return relations } -export async function getRelationDependencies(): Map { +export async function getRelationDependencies() { return await getObjectDependencies() } diff --git a/dashboard/pages/ddl_graph.tsx b/dashboard/pages/ddl_graph.tsx index 3638a95395a1..d6358a7023a0 100644 --- a/dashboard/pages/ddl_graph.tsx +++ b/dashboard/pages/ddl_graph.tsx @@ -47,7 +47,7 @@ import { import { getFragmentsByJobId, getRelationDependencies, getRelationIdInfos, - getStreamingJobs, Relation, + getStreamingJobs, Relation, StreamingJob, } from "../lib/api/streaming" import {DdlBox, FragmentBox} from "../lib/layout" import { TableFragments, TableFragments_Fragment } from "../proto/gen/meta" @@ -116,57 +116,6 @@ const SampleDdlBackpressures: Map = new Map([ ["3_4", 0.4], ]) -function buildPlanNodeDependency( - fragment: TableFragments_Fragment -): d3.HierarchyNode { - const firstActor = fragment.actors[0] - - const hierarchyActorNode = (node: StreamNode): PlanNodeDatum => { - return { - name: node.nodeBody?.$case?.toString() || "unknown", - children: (node.input || []).map(hierarchyActorNode), - operatorId: node.operatorId, - node, - } - } - - let dispatcherName: string - - if (firstActor.dispatcher.length > 0) { - const firstDispatcherName = _.camelCase( - firstActor.dispatcher[0].type.replace(/^DISPATCHER_TYPE_/, "") - ) - if (firstActor.dispatcher.length > 1) { - if ( - firstActor.dispatcher.every( - (d) => d.type === firstActor.dispatcher[0].type - ) - ) { - dispatcherName = `${firstDispatcherName}Dispatchers` - } else { - dispatcherName = "multipleDispatchers" - } - } else { - dispatcherName = `${firstDispatcherName}Dispatcher` - } - } else { - dispatcherName = "noDispatcher" - } - - const dispatcherNode = fragment.actors.reduce((obj, actor) => { - obj[actor.actorId] = actor.dispatcher - return obj - }, {} as DispatcherNode) - - return d3.hierarchy({ - name: dispatcherName, - actorIds: fragment.actors.map((a) => a.actorId.toString()), - children: firstActor.nodes ? [hierarchyActorNode(firstActor.nodes)] : [], - operatorId: "dispatcher", - node: dispatcherNode, - }) -} - function findMergeNodes(root: StreamNode): MergeNode[] { let mergeNodes = new Set() @@ -226,18 +175,22 @@ function buildFragmentDependencyAsEdges( } function buildDdlDependencyAsEdges( - relations: Relation[], - dependencies: Map, + relations: StreamingJob[], ): DdlBox[] { + // Filter out non-streaming relations, e.g. source, views. + let relation_ids = new Set() + for (const relation of relations) { + relation_ids.add(relation.id) + } const nodes: DdlBox[] = [] for (const relation of relations) { - let parentIds = dependencies.get(relation.id) || [] + let parentIds = relation.dependentRelations nodes.push({ id: relation.id.toString(), order: relation.id, width: 0, height: 0, - parentIds: parentIds.map((x) => x.toString()), + parentIds: parentIds.filter((x) => relation_ids.has(x)).map((x) => x.toString()), ddl_name: relation.name, schema_name: relation.schemaName }) @@ -264,38 +217,23 @@ interface EmbeddedBackPressureInfo { export default function Streaming() { const { response: relationList } = useFetch(getStreamingJobs) - const { response: relationIdInfos } = useFetch(getRelationIdInfos) - // 1. Get the relation dependendencies. const { response: relationDeps } = useFetch(getRelationDependencies) - // 2. Get the relation -> input fragment_id mapping. - // 3. Get the relation -> output fragment_id mapping. - // 4. Construct the BP graph for relation ids using 1-3. const [relationId, setRelationId] = useQueryState("id", parseAsInteger) - const [selectedFragmentId, setSelectedFragmentId] = useState() - const [tableFragments, setTableFragments] = useState() const toast = useErrorToast() - useEffect(() => { - if (relationId) { - setTableFragments(undefined) - getFragmentsByJobId(relationId).then((tf) => { - setTableFragments(tf) - }) - } - }, [relationId]) - - const fragmentDependencyCallback = useCallback(() => { - if (tableFragments) { - const fragmentDep = buildFragmentDependencyAsEdges(tableFragments) - return { - fragments: tableFragments, - fragmentDep, - fragmentDepDag: dagStratify()(fragmentDep), + const ddlDependencyCallback = useCallback(() => { + if (relationList) { + if (relationDeps) { + console.log(relationDeps) + const ddlDep = buildDdlDependencyAsEdges(relationList) + return { + ddlDep, + } } } - }, [tableFragments]) + }, [relationList, relationDeps]) useEffect(() => { if (relationList) { @@ -307,68 +245,8 @@ export default function Streaming() { } }, [relationId, relationList, setRelationId]) - const fragmentDependency = fragmentDependencyCallback()?.fragmentDep - const fragmentDependencyDag = fragmentDependencyCallback()?.fragmentDepDag - const fragments = fragmentDependencyCallback()?.fragments - - const planNodeDependenciesCallback = useCallback(() => { - const fragments_ = fragments?.fragments - if (fragments_) { - const planNodeDependencies = new Map< - string, - d3.HierarchyNode - >() - for (const fragmentId in fragments_) { - const fragment = fragments_[fragmentId] - const dep = buildPlanNodeDependency(fragment) - planNodeDependencies.set(fragmentId, dep) - } - return planNodeDependencies - } - return undefined - }, [fragments?.fragments]) - - const planNodeDependencies = planNodeDependenciesCallback() - - const [searchActorId, setSearchActorId] = useState("") - const [searchFragId, setSearchFragId] = useState("") - - const handleSearchFragment = () => { - const searchFragIdInt = parseInt(searchFragId) - if (relationIdInfos) { - let map = relationIdInfos.map - for (const relationId in map) { - const fragmentIdToRelationId = map[relationId].map - for (const fragmentId in fragmentIdToRelationId) { - if (parseInt(fragmentId) == searchFragIdInt) { - setRelationId(parseInt(relationId)) - setSelectedFragmentId(searchFragIdInt) - return - } - } - } - } - toast(new Error(`Fragment ${searchFragIdInt} not found`)) - } - - const handleSearchActor = () => { - const searchActorIdInt = parseInt(searchActorId) - if (relationIdInfos) { - let map = relationIdInfos.map - for (const relationId in map) { - const fragmentIdToRelationId = map[relationId].map - for (const fragmentId in fragmentIdToRelationId) { - let actorIds = fragmentIdToRelationId[fragmentId].ids - if (actorIds.includes(searchActorIdInt)) { - setRelationId(parseInt(relationId)) - setSelectedFragmentId(parseInt(fragmentId)) - return - } - } - } - } - toast(new Error(`Actor ${searchActorIdInt} not found`)) - } + const ddlDependency = ddlDependencyCallback()?.ddlDep + console.log("ddlDependency: ", ddlDependency) const [backPressureDataSource, setBackPressureDataSource] = useState("Embedded") @@ -507,27 +385,6 @@ export default function Streaming() { ))} - - Goto - - - setSearchFragId(event.target.value)} - > - - - - setSearchActorId(event.target.value)} - > - - - - Back Pressure Data Source - - Fragments - {fragmentDependencyDag && ( - - - setSelectedFragmentId(parseInt(id)) - } - selectedId={selectedFragmentId?.toString()} - /> - - )} - Fragment Graph - {planNodeDependencies && fragmentDependency && ( + {ddlDependency && ( )}