diff --git a/Project.toml b/Project.toml index 57d6a95..d510851 100644 --- a/Project.toml +++ b/Project.toml @@ -7,6 +7,7 @@ version = "0.2.1" AlgebraicPetri = "4f99eebe-17bf-4e98-b6a1-2c4f205a959b" AutoHashEquals = "15f4f7f2-30c1-5605-9d31-71845cf9641f" Catlab = "134e5e36-593f-5add-ad60-77f754baafbe" +Dagger = "d58978e5-989f-55fb-8d15-ea34adc7bf54" DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" LibPQ = "194296ae-ab2e-5f79-8cd4-7183a0a5a0d1" @@ -14,6 +15,7 @@ LibPQ = "194296ae-ab2e-5f79-8cd4-7183a0a5a0d1" AlgebraicPetri = "0.5.1" AutoHashEquals = "0.2.0" Catlab = "^0.9" +Dagger = "0.10.2" DataFrames = "0.21.0" LibPQ = "1.4.0" julia = "1.0" diff --git a/src/Presentations.jl b/src/Presentations.jl index a778db7..5bbf4cb 100644 --- a/src/Presentations.jl +++ b/src/Presentations.jl @@ -8,12 +8,15 @@ module Presentations using Catlab.Present: Presentation using Catlab.Theories.FreeSchema: Attr, Data using AlgebraicPetri + using Catlab.WiringDiagrams.DirectedWiringDiagrams import Catlab.Theories: FreeSymmetricMonoidalCategory, ⊗ import Catlab.Programs: @program + using Catlab.CategoricalAlgebra + using Dagger export present_to_schema, @program, draw_workflow, FreeSymmetricMonoidalCategory, add_types!, add_type!, add_process!, add_processes!, Presentation, ⊗, - draw_schema + draw_schema, evaluate Presentation() = Presentation(FreeSymmetricMonoidalCategory) @@ -112,4 +115,47 @@ module Presentations function draw_workflow(p; kw...) to_graphviz(p; orientation=LeftToRight, kw...) end + + function evaluate(dwd::WiringDiagram, funcs::Dict{Symbol, <:Function}, + input_vals::Array) + graph = dwd.graph + g_inputs = [wire.source.port for wire in subpart(graph, incident(graph, 1, :src), + :wire)] + g_outputs = incident(graph, 2, :tgt) + g_out_ports = [wire.target.port for wire in subpart(graph,g_outputs,:wire)] + g_out_ports[g_out_ports] .= g_outputs + + values = Array{Thunk, 1}(undef, nparts(graph, :E)) + values[incident(graph, 1, :src)] .= [delayed(x->x)(input_vals[i]) for i in g_inputs] + + evaluated = fill(false, nparts(graph, :V)) + available = fill(false, nparts(graph, :E)) + evaluated[1:2] .= true + available[incident(graph, 1, :src)] .= true + + while !all(evaluated) + mod = false; + for i in 3:nparts(graph, :V) + inputs = incident(graph, i, :tgt) + outputs = incident(graph, i, :src) + out_ports = [wire.source.port for wire in subpart(graph, + outputs, + :wire)] + in_ports = [wire.target.port for wire in subpart(graph,inputs,:wire)] + in_ports[in_ports] .= inputs + if !evaluated[i] && all(available[inputs]) + func = delayed(funcs[subpart(graph, i, :box).value])(values[in_ports]...) + values[outputs] .= [delayed(x->x[i])(func) for i in out_ports] + evaluated[i] = true + available[outputs] .= true + mod = true; + end + end + if !mod + error("Not all boxes are able to be evaluated") + end + end + return collect(delayed((x...)->x)(values[g_out_ports]...)) + end + end diff --git a/test/Presentations.jl b/test/Presentations.jl index b0a711d..60b345a 100644 --- a/test/Presentations.jl +++ b/test/Presentations.jl @@ -13,10 +13,10 @@ Files, Images, NeuralNet, (:Metadata, String)]); # Add Processes to workflow -extract, split_im, train, evaluate = add_processes!(wf, [(:extract, Files, Images), +extract, split_im, train, test_net = add_processes!(wf, [(:extract, Files, Images), (:split_im, Images, Images⊗Images), (:train, NeuralNet⊗Images, NeuralNet⊗Metadata), - (:evaluate, NeuralNet⊗Images, Accuracy⊗Metadata)]); + (:test_net, NeuralNet⊗Images, Accuracy⊗Metadata)]); # Convert to Schema TrainDB = present_to_schema(wf); g = draw_schema(wf) @@ -24,3 +24,64 @@ g = draw_schema(wf) @test wf isa Catlab.Present.Presentation @test TrainDB <: Catlab.CategoricalAlgebra.ACSet @test g isa Catlab.Graphics.Graphviz.Graph + +@testset "Scheduling with Real Math" begin +math = Presentation() + +# Add Products to workflow +Num, = add_types!(math, [(:Num, Int)]); + +# Add Processes to workflow +add, sub, mult, div = add_processes!(math, [(:add, Num⊗Num, Num), + (:sub, Num⊗Num, Num), + (:mult, Num⊗Num, Num), + (:div, Num⊗Num, Num)]); +opers = @program math (x::Num, y::Num, z::Num) begin + x_p_y = add(x,y) + total = add(x_p_y, z) + div_z = div(total, z) + return div_z +end; + +@test evaluate(opers, Dict(:add=> +, :sub=> -, :div => /, :mult => *), [1,2,3])[1] == 2.0 +end + +@testset "Scheduling with Imaginary Math" begin +im_math = Presentation() + +# Add Products to workflow +Num, = add_types!(im_math, [(:Num, Int)]); + +# Add Processes to workflow +add, sub, mult, div, neg = add_processes!(im_math, [(:add, Num⊗Num⊗Num⊗Num, Num⊗Num), + (:sub, Num⊗Num⊗Num⊗Num, Num⊗Num), + (:mult, Num⊗Num⊗Num⊗Num, Num⊗Num), + (:div, Num⊗Num⊗Num⊗Num, Num⊗Num), + (:sc_div, Num⊗Num⊗Num, Num⊗Num), + (:neg, Num, Num)]); + +im_add(xr, xi, yr, yi) = (xr+yr, xi+yi) +im_sub(xr, xi, yr, yi) = (xr-yr, xi-yi) +im_mult(xr, xi, yr, yi) = (xr*yr - xi*yi, xr*yi+xi*yr) +sc_div(xr, xi, c) = (xr/c, xi/c) +sc_neg(x) = -x +im_div_p = @program im_math (xr::Num, xi::Num, yr::Num, yi::Num) begin + numr, numi = mult(xr, xi, yr, neg(yi)) + denr, deni = mult(yr, yi, yr, neg(yi)) + return sc_div(numr, numi, denr) +end +func_map = Dict(:add=>im_add, :sub=>im_sub, :mult=>im_mult, :sc_div=>sc_div, :neg=>sc_neg) +im_div(xr, xi, yr, yi) = evaluate(im_div_p, func_map, [xr, xi, yr, yi]) +func_map[:div] = im_div + +im_test = @program im_math (xr::Num, xi::Num, yr::Num, yi::Num) begin + sumr, sumi = add(xr, xi, yr, yi) + swxr, swxi = div(xi, xr, sumi, xr) + swyr, swyi = mult(yi, yr, sumr, yr) + allsumr, allsumi = add(sumi, sumi, sumi, sumi) + return swxr, swxi, swyr, swyi +end + +@test evaluate(im_test, func_map, [2, 2, 3, 4]) == (0.4, 0.2, 11, 27) + +end