Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduling Workflows #17

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ version = "0.2.1"
AlgebraicPetri = "4f99eebe-17bf-4e98-b6a1-2c4f205a959b"
AutoHashEquals = "15f4f7f2-30c1-5605-9d31-71845cf9641f"
Catlab = "134e5e36-593f-5add-ad60-77f754baafbe"
Dagger = "d58978e5-989f-55fb-8d15-ea34adc7bf54"
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
LibPQ = "194296ae-ab2e-5f79-8cd4-7183a0a5a0d1"

[compat]
AlgebraicPetri = "0.5.1"
AutoHashEquals = "0.2.0"
Catlab = "^0.9"
Dagger = "0.10.2"
DataFrames = "0.21.0"
LibPQ = "1.4.0"
julia = "1.0"
Expand Down
48 changes: 47 additions & 1 deletion src/Presentations.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ module Presentations
using Catlab.Present: Presentation
using Catlab.Theories.FreeSchema: Attr, Data
using AlgebraicPetri
using Catlab.WiringDiagrams.DirectedWiringDiagrams
import Catlab.Theories: FreeSymmetricMonoidalCategory, ⊗
import Catlab.Programs: @program
using Catlab.CategoricalAlgebra
using Dagger

export present_to_schema, @program, draw_workflow, FreeSymmetricMonoidalCategory,
add_types!, add_type!, add_process!, add_processes!, Presentation, ⊗,
draw_schema
draw_schema, evaluate

Presentation() = Presentation(FreeSymmetricMonoidalCategory)

Expand Down Expand Up @@ -112,4 +115,47 @@ module Presentations
function draw_workflow(p; kw...)
to_graphviz(p; orientation=LeftToRight, kw...)
end

function evaluate(dwd::WiringDiagram, funcs::Dict{Symbol, <:Function},
input_vals::Array)
graph = dwd.graph
g_inputs = [wire.source.port for wire in subpart(graph, incident(graph, 1, :src),
:wire)]
g_outputs = incident(graph, 2, :tgt)
g_out_ports = [wire.target.port for wire in subpart(graph,g_outputs,:wire)]
g_out_ports[g_out_ports] .= g_outputs

values = Array{Thunk, 1}(undef, nparts(graph, :E))
values[incident(graph, 1, :src)] .= [delayed(x->x)(input_vals[i]) for i in g_inputs]

evaluated = fill(false, nparts(graph, :V))
available = fill(false, nparts(graph, :E))
evaluated[1:2] .= true
available[incident(graph, 1, :src)] .= true

while !all(evaluated)
mod = false;
for i in 3:nparts(graph, :V)
inputs = incident(graph, i, :tgt)
outputs = incident(graph, i, :src)
out_ports = [wire.source.port for wire in subpart(graph,
outputs,
:wire)]
in_ports = [wire.target.port for wire in subpart(graph,inputs,:wire)]
in_ports[in_ports] .= inputs
if !evaluated[i] && all(available[inputs])
func = delayed(funcs[subpart(graph, i, :box).value])(values[in_ports]...)
values[outputs] .= [delayed(x->x[i])(func) for i in out_ports]
evaluated[i] = true
available[outputs] .= true
mod = true;
end
end
if !mod
error("Not all boxes are able to be evaluated")
end
end
return collect(delayed((x...)->x)(values[g_out_ports]...))
end

end
65 changes: 63 additions & 2 deletions test/Presentations.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,75 @@ Files, Images, NeuralNet,
(:Metadata, String)]);

# Add Processes to workflow
extract, split_im, train, evaluate = add_processes!(wf, [(:extract, Files, Images),
extract, split_im, train, test_net = add_processes!(wf, [(:extract, Files, Images),
(:split_im, Images, Images⊗Images),
(:train, NeuralNet⊗Images, NeuralNet⊗Metadata),
(:evaluate, NeuralNet⊗Images, Accuracy⊗Metadata)]);
(:test_net, NeuralNet⊗Images, Accuracy⊗Metadata)]);
# Convert to Schema
TrainDB = present_to_schema(wf);
g = draw_schema(wf)

@test wf isa Catlab.Present.Presentation
@test TrainDB <: Catlab.CategoricalAlgebra.ACSet
@test g isa Catlab.Graphics.Graphviz.Graph

@testset "Scheduling with Real Math" begin
math = Presentation()

# Add Products to workflow
Num, = add_types!(math, [(:Num, Int)]);

# Add Processes to workflow
add, sub, mult, div = add_processes!(math, [(:add, Num⊗Num, Num),
(:sub, Num⊗Num, Num),
(:mult, Num⊗Num, Num),
(:div, Num⊗Num, Num)]);
opers = @program math (x::Num, y::Num, z::Num) begin
x_p_y = add(x,y)
total = add(x_p_y, z)
div_z = div(total, z)
return div_z
end;

@test evaluate(opers, Dict(:add=> +, :sub=> -, :div => /, :mult => *), [1,2,3])[1] == 2.0
end

@testset "Scheduling with Imaginary Math" begin
im_math = Presentation()

# Add Products to workflow
Num, = add_types!(im_math, [(:Num, Int)]);

# Add Processes to workflow
add, sub, mult, div, neg = add_processes!(im_math, [(:add, Num⊗Num⊗Num⊗Num, Num⊗Num),
(:sub, Num⊗Num⊗Num⊗Num, Num⊗Num),
(:mult, Num⊗Num⊗Num⊗Num, Num⊗Num),
(:div, Num⊗Num⊗Num⊗Num, Num⊗Num),
(:sc_div, Num⊗Num⊗Num, Num⊗Num),
(:neg, Num, Num)]);

im_add(xr, xi, yr, yi) = (xr+yr, xi+yi)
im_sub(xr, xi, yr, yi) = (xr-yr, xi-yi)
im_mult(xr, xi, yr, yi) = (xr*yr - xi*yi, xr*yi+xi*yr)
sc_div(xr, xi, c) = (xr/c, xi/c)
sc_neg(x) = -x
im_div_p = @program im_math (xr::Num, xi::Num, yr::Num, yi::Num) begin
numr, numi = mult(xr, xi, yr, neg(yi))
denr, deni = mult(yr, yi, yr, neg(yi))
return sc_div(numr, numi, denr)
end
func_map = Dict(:add=>im_add, :sub=>im_sub, :mult=>im_mult, :sc_div=>sc_div, :neg=>sc_neg)
im_div(xr, xi, yr, yi) = evaluate(im_div_p, func_map, [xr, xi, yr, yi])
func_map[:div] = im_div

im_test = @program im_math (xr::Num, xi::Num, yr::Num, yi::Num) begin
sumr, sumi = add(xr, xi, yr, yi)
swxr, swxi = div(xi, xr, sumi, xr)
swyr, swyi = mult(yi, yr, sumr, yr)
allsumr, allsumi = add(sumi, sumi, sumi, sumi)
return swxr, swxi, swyr, swyi
end

@test evaluate(im_test, func_map, [2, 2, 3, 4]) == (0.4, 0.2, 11, 27)

end