Skip to content

Commit

Permalink
exclude non-streaming relations from ddl graph
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Sep 17, 2024
1 parent b5a6704 commit a62b4e6
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 181 deletions.
2 changes: 1 addition & 1 deletion dashboard/lib/api/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ export async function getRelations() {
return relations
}

export async function getRelationDependencies(): Map<number, number[]> {
export async function getRelationDependencies() {
return await getObjectDependencies()
}

Expand Down
202 changes: 22 additions & 180 deletions dashboard/pages/ddl_graph.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import {
import {
getFragmentsByJobId, getRelationDependencies,
getRelationIdInfos,
getStreamingJobs, Relation,
getStreamingJobs, Relation, StreamingJob,
} from "../lib/api/streaming"
import {DdlBox, FragmentBox} from "../lib/layout"
import { TableFragments, TableFragments_Fragment } from "../proto/gen/meta"
Expand Down Expand Up @@ -116,57 +116,6 @@ const SampleDdlBackpressures: Map<string, number> = new Map([
["3_4", 0.4],
])

function buildPlanNodeDependency(
fragment: TableFragments_Fragment
): d3.HierarchyNode<PlanNodeDatum> {
const firstActor = fragment.actors[0]

const hierarchyActorNode = (node: StreamNode): PlanNodeDatum => {
return {
name: node.nodeBody?.$case?.toString() || "unknown",
children: (node.input || []).map(hierarchyActorNode),
operatorId: node.operatorId,
node,
}
}

let dispatcherName: string

if (firstActor.dispatcher.length > 0) {
const firstDispatcherName = _.camelCase(
firstActor.dispatcher[0].type.replace(/^DISPATCHER_TYPE_/, "")
)
if (firstActor.dispatcher.length > 1) {
if (
firstActor.dispatcher.every(
(d) => d.type === firstActor.dispatcher[0].type
)
) {
dispatcherName = `${firstDispatcherName}Dispatchers`
} else {
dispatcherName = "multipleDispatchers"
}
} else {
dispatcherName = `${firstDispatcherName}Dispatcher`
}
} else {
dispatcherName = "noDispatcher"
}

const dispatcherNode = fragment.actors.reduce((obj, actor) => {
obj[actor.actorId] = actor.dispatcher
return obj
}, {} as DispatcherNode)

return d3.hierarchy({
name: dispatcherName,
actorIds: fragment.actors.map((a) => a.actorId.toString()),
children: firstActor.nodes ? [hierarchyActorNode(firstActor.nodes)] : [],
operatorId: "dispatcher",
node: dispatcherNode,
})
}

function findMergeNodes(root: StreamNode): MergeNode[] {
let mergeNodes = new Set<MergeNode>()

Expand Down Expand Up @@ -226,18 +175,22 @@ function buildFragmentDependencyAsEdges(
}

function buildDdlDependencyAsEdges(
relations: Relation[],
dependencies: Map<number, number[]>,
relations: StreamingJob[],
): DdlBox[] {
// Filter out non-streaming relations, e.g. source, views.
let relation_ids = new Set<number>()
for (const relation of relations) {
relation_ids.add(relation.id)
}
const nodes: DdlBox[] = []
for (const relation of relations) {
let parentIds = dependencies.get(relation.id) || []
let parentIds = relation.dependentRelations
nodes.push({
id: relation.id.toString(),
order: relation.id,
width: 0,
height: 0,
parentIds: parentIds.map((x) => x.toString()),
parentIds: parentIds.filter((x) => relation_ids.has(x)).map((x) => x.toString()),
ddl_name: relation.name,
schema_name: relation.schemaName
})
Expand All @@ -264,38 +217,23 @@ interface EmbeddedBackPressureInfo {

export default function Streaming() {
const { response: relationList } = useFetch(getStreamingJobs)
const { response: relationIdInfos } = useFetch(getRelationIdInfos)
// 1. Get the relation dependendencies.
const { response: relationDeps } = useFetch(getRelationDependencies)
// 2. Get the relation -> input fragment_id mapping.
// 3. Get the relation -> output fragment_id mapping.
// 4. Construct the BP graph for relation ids using 1-3.

const [relationId, setRelationId] = useQueryState("id", parseAsInteger)
const [selectedFragmentId, setSelectedFragmentId] = useState<number>()
const [tableFragments, setTableFragments] = useState<TableFragments>()

const toast = useErrorToast()

useEffect(() => {
if (relationId) {
setTableFragments(undefined)
getFragmentsByJobId(relationId).then((tf) => {
setTableFragments(tf)
})
}
}, [relationId])

const fragmentDependencyCallback = useCallback(() => {
if (tableFragments) {
const fragmentDep = buildFragmentDependencyAsEdges(tableFragments)
return {
fragments: tableFragments,
fragmentDep,
fragmentDepDag: dagStratify()(fragmentDep),
const ddlDependencyCallback = useCallback(() => {
if (relationList) {
if (relationDeps) {
console.log(relationDeps)
const ddlDep = buildDdlDependencyAsEdges(relationList)
return {
ddlDep,
}
}
}
}, [tableFragments])
}, [relationList, relationDeps])

useEffect(() => {
if (relationList) {
Expand All @@ -307,68 +245,8 @@ export default function Streaming() {
}
}, [relationId, relationList, setRelationId])

const fragmentDependency = fragmentDependencyCallback()?.fragmentDep
const fragmentDependencyDag = fragmentDependencyCallback()?.fragmentDepDag
const fragments = fragmentDependencyCallback()?.fragments

const planNodeDependenciesCallback = useCallback(() => {
const fragments_ = fragments?.fragments
if (fragments_) {
const planNodeDependencies = new Map<
string,
d3.HierarchyNode<PlanNodeDatum>
>()
for (const fragmentId in fragments_) {
const fragment = fragments_[fragmentId]
const dep = buildPlanNodeDependency(fragment)
planNodeDependencies.set(fragmentId, dep)
}
return planNodeDependencies
}
return undefined
}, [fragments?.fragments])

const planNodeDependencies = planNodeDependenciesCallback()

const [searchActorId, setSearchActorId] = useState<string>("")
const [searchFragId, setSearchFragId] = useState<string>("")

const handleSearchFragment = () => {
const searchFragIdInt = parseInt(searchFragId)
if (relationIdInfos) {
let map = relationIdInfos.map
for (const relationId in map) {
const fragmentIdToRelationId = map[relationId].map
for (const fragmentId in fragmentIdToRelationId) {
if (parseInt(fragmentId) == searchFragIdInt) {
setRelationId(parseInt(relationId))
setSelectedFragmentId(searchFragIdInt)
return
}
}
}
}
toast(new Error(`Fragment ${searchFragIdInt} not found`))
}

const handleSearchActor = () => {
const searchActorIdInt = parseInt(searchActorId)
if (relationIdInfos) {
let map = relationIdInfos.map
for (const relationId in map) {
const fragmentIdToRelationId = map[relationId].map
for (const fragmentId in fragmentIdToRelationId) {
let actorIds = fragmentIdToRelationId[fragmentId].ids
if (actorIds.includes(searchActorIdInt)) {
setRelationId(parseInt(relationId))
setSelectedFragmentId(parseInt(fragmentId))
return
}
}
}
}
toast(new Error(`Actor ${searchActorIdInt} not found`))
}
const ddlDependency = ddlDependencyCallback()?.ddlDep
console.log("ddlDependency: ", ddlDependency)

const [backPressureDataSource, setBackPressureDataSource] =
useState<BackPressureDataSource>("Embedded")
Expand Down Expand Up @@ -507,27 +385,6 @@ export default function Streaming() {
))}
</Select>
</FormControl>
<FormControl>
<FormLabel>Goto</FormLabel>
<VStack spacing={2}>
<HStack>
<Input
placeholder="Fragment Id"
value={searchFragId}
onChange={(event) => setSearchFragId(event.target.value)}
></Input>
<Button onClick={(_) => handleSearchFragment()}>Go</Button>
</HStack>
<HStack>
<Input
placeholder="Actor Id"
value={searchActorId}
onChange={(event) => setSearchActorId(event.target.value)}
></Input>
<Button onClick={(_) => handleSearchActor()}>Go</Button>
</HStack>
</VStack>
</FormControl>
<FormControl>
<FormLabel>Back Pressure Data Source</FormLabel>
<Select
Expand All @@ -547,21 +404,6 @@ export default function Streaming() {
</option>
</Select>
</FormControl>
<Flex height="full" width="full" flexDirection="column">
<Text fontWeight="semibold">Fragments</Text>
{fragmentDependencyDag && (
<Box flex="1" overflowY="scroll">
<FragmentDependencyGraph
svgWidth={SIDEBAR_WIDTH}
fragmentDependency={fragmentDependencyDag}
onSelectedIdChange={(id) =>
setSelectedFragmentId(parseInt(id))
}
selectedId={selectedFragmentId?.toString()}
/>
</Box>
)}
</Flex>
</VStack>
<Box
flex={1}
Expand All @@ -571,9 +413,9 @@ export default function Streaming() {
overflowY="scroll"
>
<Text fontWeight="semibold">Fragment Graph</Text>
{planNodeDependencies && fragmentDependency && (
{ddlDependency && (
<DdlGraph
ddlDependency={SampleDdlDependencyGraph}
ddlDependency={ddlDependency}
backPressures={SampleDdlBackpressures}
/>
)}
Expand Down

0 comments on commit a62b4e6

Please sign in to comment.