From c765853616cafcc12de47742707f6fc4b3baeaeb Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Tue, 9 Apr 2024 01:08:41 +0200 Subject: [PATCH] Debug scheduler test --- .buildkite/pipeline.yml | 174 ++++++------ test/scheduler.jl | 585 ++++++++++++++++++++-------------------- 2 files changed, 376 insertions(+), 383 deletions(-) diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 05519c9bf..2061476a4 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -15,46 +15,46 @@ arch: x86_64 num_cpus: 16 steps: - - label: Julia 1.8 - timeout_in_minutes: 90 - <<: *test - plugins: - - JuliaCI/julia#v1: - version: "1.8" - - JuliaCI/julia-test#v1: - julia_args: "--threads=1" - - JuliaCI/julia-coverage#v1: - codecov: true - - label: Julia 1.9 - timeout_in_minutes: 90 - <<: *test - plugins: - - JuliaCI/julia#v1: - version: "1.9" - - JuliaCI/julia-test#v1: - julia_args: "--threads=1" - - JuliaCI/julia-coverage#v1: - codecov: true - - label: Julia 1.10 - timeout_in_minutes: 90 - <<: *test - plugins: - - JuliaCI/julia#v1: - version: "1.10" - - JuliaCI/julia-test#v1: - julia_args: "--threads=1" - - JuliaCI/julia-coverage#v1: - codecov: true - - label: Julia nightly - timeout_in_minutes: 90 - <<: *test - plugins: - - JuliaCI/julia#v1: - version: "1.10-nightly" - - JuliaCI/julia-test#v1: - julia_args: "--threads=1" - - JuliaCI/julia-coverage#v1: - codecov: true + # - label: Julia 1.8 + # timeout_in_minutes: 90 + # <<: *test + # plugins: + # - JuliaCI/julia#v1: + # version: "1.8" + # - JuliaCI/julia-test#v1: + # julia_args: "--threads=1" + # - JuliaCI/julia-coverage#v1: + # codecov: true + # - label: Julia 1.9 + # timeout_in_minutes: 90 + # <<: *test + # plugins: + # - JuliaCI/julia#v1: + # version: "1.9" + # - JuliaCI/julia-test#v1: + # julia_args: "--threads=1" + # - JuliaCI/julia-coverage#v1: + # codecov: true + # - label: Julia 1.10 + # timeout_in_minutes: 90 + # <<: *test + # plugins: + # - JuliaCI/julia#v1: + # version: "1.10" + # - JuliaCI/julia-test#v1: + # julia_args: "--threads=1" + # - JuliaCI/julia-coverage#v1: + # codecov: true + # - label: Julia nightly + # timeout_in_minutes: 90 + # <<: *test + # plugins: + # - JuliaCI/julia#v1: + # version: "1.10-nightly" + # - JuliaCI/julia-test#v1: + # julia_args: "--threads=1" + # - JuliaCI/julia-coverage#v1: + # codecov: true - label: Julia 1.8 (macOS) timeout_in_minutes: 90 <<: *test @@ -69,50 +69,50 @@ steps: julia_args: "--threads=1" - JuliaCI/julia-coverage#v1: codecov: true - - label: Julia 1.8 - TimespanLogging - timeout_in_minutes: 20 - <<: *test - plugins: - - JuliaCI/julia#v1: - version: "1.8" - - JuliaCI/julia-coverage#v1: - codecov: true - command: "julia --project -e 'using Pkg; Pkg.instantiate(); Pkg.develop(;path=\"lib/TimespanLogging\"); Pkg.test(\"TimespanLogging\")'" - - label: Julia 1.8 - DaggerWebDash - timeout_in_minutes: 20 - <<: *test - plugins: - - JuliaCI/julia#v1: - version: "1.8" - - JuliaCI/julia-coverage#v1: - codecov: true - command: "julia -e 'using Pkg; Pkg.develop(;path=pwd()); Pkg.develop(;path=\"lib/TimespanLogging\"); Pkg.develop(;path=\"lib/DaggerWebDash\"); include(\"lib/DaggerWebDash/test/runtests.jl\")'" - - label: Benchmarks - timeout_in_minutes: 120 - <<: *bench - plugins: - - JuliaCI/julia#v1: - version: "1.8" - - JuliaCI/julia-test#v1: - run_tests: false - command: "julia -e 'using Pkg; Pkg.add(\"BenchmarkTools\"); Pkg.develop(;path=pwd())'; JULIA_PROJECT=\"$PWD\" julia --project benchmarks/benchmark.jl" - env: - JULIA_NUM_THREADS: "4" - BENCHMARK: "nmf:raw,dagger" - BENCHMARK_PROCS: "4:4" - BENCHMARK_SCALE: "5:5:50" - artifacts: - - benchmarks/result* - - label: DTables.jl stability test - timeout_in_minutes: 20 - plugins: - - JuliaCI/julia#v1: - version: "1.8" - env: - JULIA_NUM_THREADS: "4" - agents: - queue: "juliaecosystem" - sandbox_capable: "true" - os: linux - arch: x86_64 - command: "git clone https://github.com/JuliaParallel/DTables.jl.git ; julia -t4 -e 'using Pkg; Pkg.activate(\"DTables.jl\"); Pkg.develop(;path=\".\"); Pkg.instantiate(); Pkg.test()'" + # - label: Julia 1.8 - TimespanLogging + # timeout_in_minutes: 20 + # <<: *test + # plugins: + # - JuliaCI/julia#v1: + # version: "1.8" + # - JuliaCI/julia-coverage#v1: + # codecov: true + # command: "julia --project -e 'using Pkg; Pkg.instantiate(); Pkg.develop(;path=\"lib/TimespanLogging\"); Pkg.test(\"TimespanLogging\")'" + # - label: Julia 1.8 - DaggerWebDash + # timeout_in_minutes: 20 + # <<: *test + # plugins: + # - JuliaCI/julia#v1: + # version: "1.8" + # - JuliaCI/julia-coverage#v1: + # codecov: true + # command: "julia -e 'using Pkg; Pkg.develop(;path=pwd()); Pkg.develop(;path=\"lib/TimespanLogging\"); Pkg.develop(;path=\"lib/DaggerWebDash\"); include(\"lib/DaggerWebDash/test/runtests.jl\")'" + # - label: Benchmarks + # timeout_in_minutes: 120 + # <<: *bench + # plugins: + # - JuliaCI/julia#v1: + # version: "1.8" + # - JuliaCI/julia-test#v1: + # run_tests: false + # command: "julia -e 'using Pkg; Pkg.add(\"BenchmarkTools\"); Pkg.develop(;path=pwd())'; JULIA_PROJECT=\"$PWD\" julia --project benchmarks/benchmark.jl" + # env: + # JULIA_NUM_THREADS: "4" + # BENCHMARK: "nmf:raw,dagger" + # BENCHMARK_PROCS: "4:4" + # BENCHMARK_SCALE: "5:5:50" + # artifacts: + # - benchmarks/result* + # - label: DTables.jl stability test + # timeout_in_minutes: 20 + # plugins: + # - JuliaCI/julia#v1: + # version: "1.8" + # env: + # JULIA_NUM_THREADS: "4" + # agents: + # queue: "juliaecosystem" + # sandbox_capable: "true" + # os: linux + # arch: x86_64 + # command: "git clone https://github.com/JuliaParallel/DTables.jl.git ; julia -t4 -e 'using Pkg; Pkg.activate(\"DTables.jl\"); Pkg.develop(;path=\".\"); Pkg.instantiate(); Pkg.test()'" diff --git a/test/scheduler.jl b/test/scheduler.jl index e6afe61d0..3c3ab574f 100644 --- a/test/scheduler.jl +++ b/test/scheduler.jl @@ -94,97 +94,98 @@ end end @testset "Scheduler" begin - @testset "Scheduler options" begin - @testset "single worker" begin - options = SchedulerOptions(;single=1) - a = delayed(checkwid)(1) - b = delayed(checkwid)(2) - c = delayed(checkwid)(a,b) - - @test collect(Context([1,workers()...]), c; options=options) == 1 - end - @static if VERSION >= v"1.3.0-DEV.573" - if Threads.nthreads() == 1 - @warn "Threading tests running in serial" - end - @testset "proclist" begin - options = SchedulerOptions(;proclist=[Dagger.ThreadProc]) - a = delayed(checktid)(1) - b = delayed(checktid)(2) - c = delayed(checktid)(a,b) - - @test collect(Context(), c; options=options) == 1 - end - end - @testset "allow errors" begin - options = SchedulerOptions(;allow_errors=true) - a = delayed(error)("Test") - ex = try - collect(a) - catch err - err - end - @test Dagger.Sch.unwrap_nested_exception(ex) isa ErrorException - end - end - @testset "Thunk options" begin - @testset "meta" begin - a = Dagger.@par rand(4) - b = Dagger.@par meta=true (a->begin - @assert a isa Dagger.Chunk - Dagger.tochunk(myid()) - end)(a) - if Threads.nthreads() == 1 - @test collect(b) in workers() - else - @test collect(b) in procs() - end - end - @testset "single worker" begin - options = ThunkOptions(;single=1) - a = delayed(checkwid; options=options)(1) - - @test collect(Context([1,workers()...]), a) == 1 - end - @static if VERSION >= v"1.3.0-DEV.573" - @testset "proclist" begin - options = ThunkOptions(;proclist=[Dagger.ThreadProc]) - a = delayed(checktid; options=options)(1) - - @test collect(Context(), a) == 1 - end - end - @everywhere Dagger.add_processor_callback!(()->FakeProc(), :fakeproc) - @testset "proclist FakeProc" begin - @test Dagger.iscompatible_arg(FakeProc(), nothing, Int) == true - @test Dagger.iscompatible_arg(FakeProc(), nothing, FakeVal) == true - @test Dagger.iscompatible_arg(FakeProc(), nothing, Float64) == false - @test Dagger.default_enabled(Dagger.ThreadProc(1,1)) == true - @test Dagger.default_enabled(FakeProc()) == false - - opts = Dagger.Sch.ThunkOptions(;proclist=[Dagger.ThreadProc]) - as = [delayed(identity; options=opts)(i) for i in 1:5] - opts = Dagger.Sch.ThunkOptions(;proclist=[FakeProc]) - b = delayed(fakesum; options=opts)(as...) - - @test collect(Context(), b) == FakeVal(57) - end - @everywhere Dagger.delete_processor_callback!(:fakeproc) - @test_skip "procutil" - #= - @testset "procutil" begin - opts = ThunkOptions(;procutil=Dict(Dagger.ThreadProc=>0.25)) - as = [delayed(checkpressure; options=opts)(i) for i in 1:30] - b = delayed(checkpressure)(as...) - collect(b) - end - =# - @testset "allow errors" begin - opts = ThunkOptions(;allow_errors=true) - a = delayed(error; options=opts)("Test") - @test_throws_unwrap Dagger.ThunkFailedException collect(a) - end - end + # @testset "Scheduler options" begin + # @testset "single worker" begin + # options = SchedulerOptions(;single=1) + # a = delayed(checkwid)(1) + # b = delayed(checkwid)(2) + # c = delayed(checkwid)(a,b) + + # @test collect(Context([1,workers()...]), c; options=options) == 1 + # end + # @static if VERSION >= v"1.3.0-DEV.573" + # if Threads.nthreads() == 1 + # @warn "Threading tests running in serial" + # end + # @testset "proclist" begin + # options = SchedulerOptions(;proclist=[Dagger.ThreadProc]) + # a = delayed(checktid)(1) + # b = delayed(checktid)(2) + # c = delayed(checktid)(a,b) + + # @test collect(Context(), c; options=options) == 1 + # end + # end + # @testset "allow errors" begin + # options = SchedulerOptions(;allow_errors=true) + # a = delayed(error)("Test") + # ex = try + # collect(a) + # catch err + # err + # end + # @test Dagger.Sch.unwrap_nested_exception(ex) isa ErrorException + # end + # end + + # @testset "Thunk options" begin + # @testset "meta" begin + # a = Dagger.@par rand(4) + # b = Dagger.@par meta=true (a->begin + # @assert a isa Dagger.Chunk + # Dagger.tochunk(myid()) + # end)(a) + # if Threads.nthreads() == 1 + # @test collect(b) in workers() + # else + # @test collect(b) in procs() + # end + # end + # @testset "single worker" begin + # options = ThunkOptions(;single=1) + # a = delayed(checkwid; options=options)(1) + + # @test collect(Context([1,workers()...]), a) == 1 + # end + # @static if VERSION >= v"1.3.0-DEV.573" + # @testset "proclist" begin + # options = ThunkOptions(;proclist=[Dagger.ThreadProc]) + # a = delayed(checktid; options=options)(1) + + # @test collect(Context(), a) == 1 + # end + # end + # @everywhere Dagger.add_processor_callback!(()->FakeProc(), :fakeproc) + # @testset "proclist FakeProc" begin + # @test Dagger.iscompatible_arg(FakeProc(), nothing, Int) == true + # @test Dagger.iscompatible_arg(FakeProc(), nothing, FakeVal) == true + # @test Dagger.iscompatible_arg(FakeProc(), nothing, Float64) == false + # @test Dagger.default_enabled(Dagger.ThreadProc(1,1)) == true + # @test Dagger.default_enabled(FakeProc()) == false + + # opts = Dagger.Sch.ThunkOptions(;proclist=[Dagger.ThreadProc]) + # as = [delayed(identity; options=opts)(i) for i in 1:5] + # opts = Dagger.Sch.ThunkOptions(;proclist=[FakeProc]) + # b = delayed(fakesum; options=opts)(as...) + + # @test collect(Context(), b) == FakeVal(57) + # end + # @everywhere Dagger.delete_processor_callback!(:fakeproc) + # @test_skip "procutil" + # #= + # @testset "procutil" begin + # opts = ThunkOptions(;procutil=Dict(Dagger.ThreadProc=>0.25)) + # as = [delayed(checkpressure; options=opts)(i) for i in 1:30] + # b = delayed(checkpressure)(as...) + # collect(b) + # end + # =# + # @testset "allow errors" begin + # opts = ThunkOptions(;allow_errors=true) + # a = delayed(error; options=opts)("Test") + # @test_throws_unwrap Dagger.ThunkFailedException collect(a) + # end + # end @testset "Modify workers in running job" begin # Test that we can add/remove workers while scheduler is running. @@ -219,18 +220,10 @@ end @testset "Add new workers" begin ps = [] try - ps1 = addprocs(2, exeflags="--project") - append!(ps, ps1) - - @everywhere vcat(ps1, myid()) $setup - - ctx = Context(ps1) - ts = delayed(vcat)((delayed(testfun)(i) for i in 1:10)...) - - job = @async collect(ctx, ts) + @everywhere vcat(workers(), myid()) $setup - while !istaskstarted(job) - sleep(0.001) + ts = map(1:10) do i + Dagger.@spawn testfun(i) end # Will not be added, so they should never appear in output @@ -240,15 +233,15 @@ end ps3 = addprocs(2, exeflags="--project") append!(ps, ps3) @everywhere ps3 $setup - addprocs!(ctx, ps3) - @test length(procs(ctx)) == 4 + addprocs!(ps3) + @test length(procs(Dagger.Sch.eager_context())) == 6 @everywhere ps3 blocked=false - ps_used = fetch(job) + ps_used = fetch.(ts) @test ps_used isa Vector - @test any(p -> p in ps_used, ps1) + @test any(p -> p in ps_used, workers()) @test any(p -> p in ps_used, ps3) @test !any(p -> p in ps2, ps_used) finally @@ -316,202 +309,202 @@ end end end=# - @testset "Remove all workers throws" begin - ps = [] - try - ps1 = addprocs(2, exeflags="--project") - append!(ps, ps1) + # @testset "Remove all workers throws" begin + # ps = [] + # try + # ps1 = addprocs(2, exeflags="--project") + # append!(ps, ps1) - @everywhere vcat(ps1, myid()) $setup + # @everywhere vcat(ps1, myid()) $setup - ts = delayed(vcat)((delayed(testfun)(i) for i in 1:16)...) + # ts = delayed(vcat)((delayed(testfun)(i) for i in 1:16)...) - ctx = Context(ps1) - job = @async collect(ctx, ts) + # ctx = Context(ps1) + # job = @async collect(ctx, ts) - while !istaskstarted(job) - sleep(0.001) - end + # while !istaskstarted(job) + # sleep(0.001) + # end - rmprocs!(ctx, ps1) - @test length(procs(ctx)) == 0 + # rmprocs!(ctx, ps1) + # @test length(procs(ctx)) == 0 - @everywhere ps1 blocked=false - if VERSION >= v"1.3.0-alpha.110" - @test_throws TaskFailedException fetch(job) - else - @test_throws Exception fetch(job) - end - finally - wait(rmprocs(ps)) - end - end + # @everywhere ps1 blocked=false + # if VERSION >= v"1.3.0-alpha.110" + # @test_throws TaskFailedException fetch(job) + # else + # @test_throws Exception fetch(job) + # end + # finally + # wait(rmprocs(ps)) + # end + # end end end -@testset "Scheduler algorithms" begin - # New function to hide from scheduler's function cost cache - mynothing(args...) = nothing - - # New non-singleton struct to hide from `approx_size` - struct MyStruct - x::Int - end - - state = Dagger.Sch.EAGER_STATE[] - tproc1 = Dagger.ThreadProc(1, 1) - tproc2 = Dagger.ThreadProc(first(workers()), 1) - procs = [tproc1, tproc2] - - pres1 = state.worker_time_pressure[1][tproc1] - pres2 = state.worker_time_pressure[first(workers())][tproc2] - tx_rate = state.transfer_rate[] - - for (args, tx_size) in [ - ([1, 2], 0), - ([Dagger.tochunk(1), 2], sizeof(Int)), - ([1, Dagger.tochunk(2)], sizeof(Int)), - ([Dagger.tochunk(1), Dagger.tochunk(2)], 2*sizeof(Int)), - # TODO: Why does this work? Seems slow - ([Dagger.tochunk(MyStruct(1))], sizeof(MyStruct)), - ([Dagger.tochunk(MyStruct(1)), Dagger.tochunk(1)], sizeof(MyStruct)+sizeof(Int)), - ] - for arg in args - if arg isa Chunk - aff = Dagger.affinity(arg) - @test aff[1] == OSProc(1) - @test aff[2] == MemPool.approx_size(MemPool.poolget(arg.handle)) - end - end - - cargs = map(arg->MemPool.poolget(arg.handle), filter(arg->isa(arg, Chunk), args)) - est_tx_size = Dagger.Sch.impute_sum(map(MemPool.approx_size, cargs)) - @test est_tx_size == tx_size - - t = delayed(mynothing)(args...) - inputs = Dagger.Sch.collect_task_inputs(state, t) - sorted_procs, costs = Dagger.Sch.estimate_task_costs(state, procs, t, inputs) - - @test tproc1 in sorted_procs - @test tproc2 in sorted_procs - if length(cargs) > 0 - @test sorted_procs[1] == tproc1 - @test sorted_procs[2] == tproc2 - end - - @test haskey(costs, tproc1) - @test haskey(costs, tproc2) - @test costs[tproc1] ≈ pres1 # All chunks are local - @test costs[tproc2] ≈ (tx_size/tx_rate) + pres2 # All chunks are remote - end -end - -@testset "Dynamic Thunks" begin - @testset "Exec" begin - a = delayed(dynamic_exec)(2) - @test collect(Context(), a) == 1 - end - @testset "Exec Error" begin - a = delayed(dynamic_exec_err)(1) - try - collect(Context(), a) - @test false - catch err - @test err isa RemoteException - end - end - @test_skip "Halt" #=begin - a = delayed(dynamic_halt)(1) - try - collect(Context(), a) - @test false - catch err - @test err isa SchedulerHaltedException - end - end=# - @testset "DAG querying" begin - a = delayed(identity)(1) - b = delayed(x->x+2)(a) - c = delayed(x->x-1)(a) - d = delayed(dynamic_get_dag)(b, c) - ids = collect(Context(), d) - @test ids isa Dict - @test length(keys(ids)) == 4 - - a_id = ThunkID(a.id) - b_id = ThunkID(b.id) - c_id = ThunkID(c.id) - d_id = ThunkID(d.id) - - @test haskey(ids, d_id) - @test length(ids[d_id]) == 0 # no one waiting on our result - @test length(ids[a_id]) == 0 # b and c finished, our result is unneeded - @test length(ids[b_id]) == 1 # d is still executing - @test length(ids[c_id]) == 1 # d is still executing - @test pop!(ids[b_id]) == d_id - @test pop!(ids[c_id]) == d_id - end - @testset "Add Thunk" begin - a = delayed(dynamic_add_thunk)(1) - res = collect(Context(), a) - @test res == 2 - @testset "self as input" begin - a = delayed(dynamic_add_thunk_self_dominated)(1) - @test_throws_unwrap Dagger.Sch.DynamicThunkException reason="Cannot fetch result of dominated thunk" collect(Context(), a) - end - end - @testset "Fetch/Wait" begin - @testset "multiple" begin - a = delayed(dynamic_wait_fetch_multiple)(delayed(+)(1,2)) - @test collect(Context(), a) == 3 - end - @testset "self" begin - a = delayed(dynamic_fetch_self)(1) - @test_throws_unwrap Dagger.Sch.DynamicThunkException reason="Cannot fetch own result" collect(Context(), a) - end - @testset "dominated" begin - a = delayed(identity)(delayed(dynamic_fetch_dominated)(1)) - @test_throws_unwrap Dagger.Sch.DynamicThunkException reason="Cannot fetch result of dominated thunk" collect(Context(), a) - end - end -end - -c1 = Dagger.tochunk(1) -c2 = Dagger.tochunk(2) -@everywhere begin -function testpresent(x,y) - @assert haskey(Dagger.Sch.CHUNK_CACHE, $c1) - @assert haskey(Dagger.Sch.CHUNK_CACHE, $c2) - x+y -end -function testevicted(x) - sleep(1) - @assert !haskey(Dagger.Sch.CHUNK_CACHE, $c1) - @assert !haskey(Dagger.Sch.CHUNK_CACHE, $c2) - x -end -end - -@test_skip "Chunk Caching" -#= -@testset "Chunk Caching" begin - compute(delayed(testevicted)(delayed(testpresent)(c1,c2))) -end -=# - -@testset "MemPool.approx_size" begin - for (obj, size) in [ - (rand(100), 100*sizeof(Float64)), - (rand(Float32, 100), 100*sizeof(Float32)), - (rand(1:10, 100), 100*sizeof(Int)), - (fill(:a, 10), missing), - (fill("a", 10), missing), - (fill('a', 10), missing), - ] - if size !== missing - @test MemPool.approx_size(obj) == size - else - @test MemPool.approx_size(obj) !== nothing - end - end -end +# @testset "Scheduler algorithms" begin +# # New function to hide from scheduler's function cost cache +# mynothing(args...) = nothing + +# # New non-singleton struct to hide from `approx_size` +# struct MyStruct +# x::Int +# end + +# state = Dagger.Sch.EAGER_STATE[] +# tproc1 = Dagger.ThreadProc(1, 1) +# tproc2 = Dagger.ThreadProc(first(workers()), 1) +# procs = [tproc1, tproc2] + +# pres1 = state.worker_time_pressure[1][tproc1] +# pres2 = state.worker_time_pressure[first(workers())][tproc2] +# tx_rate = state.transfer_rate[] + +# for (args, tx_size) in [ +# ([1, 2], 0), +# ([Dagger.tochunk(1), 2], sizeof(Int)), +# ([1, Dagger.tochunk(2)], sizeof(Int)), +# ([Dagger.tochunk(1), Dagger.tochunk(2)], 2*sizeof(Int)), +# # TODO: Why does this work? Seems slow +# ([Dagger.tochunk(MyStruct(1))], sizeof(MyStruct)), +# ([Dagger.tochunk(MyStruct(1)), Dagger.tochunk(1)], sizeof(MyStruct)+sizeof(Int)), +# ] +# for arg in args +# if arg isa Chunk +# aff = Dagger.affinity(arg) +# @test aff[1] == OSProc(1) +# @test aff[2] == MemPool.approx_size(MemPool.poolget(arg.handle)) +# end +# end + +# cargs = map(arg->MemPool.poolget(arg.handle), filter(arg->isa(arg, Chunk), args)) +# est_tx_size = Dagger.Sch.impute_sum(map(MemPool.approx_size, cargs)) +# @test est_tx_size == tx_size + +# t = delayed(mynothing)(args...) +# inputs = Dagger.Sch.collect_task_inputs(state, t) +# sorted_procs, costs = Dagger.Sch.estimate_task_costs(state, procs, t, inputs) + +# @test tproc1 in sorted_procs +# @test tproc2 in sorted_procs +# if length(cargs) > 0 +# @test sorted_procs[1] == tproc1 +# @test sorted_procs[2] == tproc2 +# end + +# @test haskey(costs, tproc1) +# @test haskey(costs, tproc2) +# @test costs[tproc1] ≈ pres1 # All chunks are local +# @test costs[tproc2] ≈ (tx_size/tx_rate) + pres2 # All chunks are remote +# end +# end + +# @testset "Dynamic Thunks" begin +# @testset "Exec" begin +# a = delayed(dynamic_exec)(2) +# @test collect(Context(), a) == 1 +# end +# @testset "Exec Error" begin +# a = delayed(dynamic_exec_err)(1) +# try +# collect(Context(), a) +# @test false +# catch err +# @test err isa RemoteException +# end +# end +# @test_skip "Halt" #=begin +# a = delayed(dynamic_halt)(1) +# try +# collect(Context(), a) +# @test false +# catch err +# @test err isa SchedulerHaltedException +# end +# end=# +# @testset "DAG querying" begin +# a = delayed(identity)(1) +# b = delayed(x->x+2)(a) +# c = delayed(x->x-1)(a) +# d = delayed(dynamic_get_dag)(b, c) +# ids = collect(Context(), d) +# @test ids isa Dict +# @test length(keys(ids)) == 4 + +# a_id = ThunkID(a.id) +# b_id = ThunkID(b.id) +# c_id = ThunkID(c.id) +# d_id = ThunkID(d.id) + +# @test haskey(ids, d_id) +# @test length(ids[d_id]) == 0 # no one waiting on our result +# @test length(ids[a_id]) == 0 # b and c finished, our result is unneeded +# @test length(ids[b_id]) == 1 # d is still executing +# @test length(ids[c_id]) == 1 # d is still executing +# @test pop!(ids[b_id]) == d_id +# @test pop!(ids[c_id]) == d_id +# end +# @testset "Add Thunk" begin +# a = delayed(dynamic_add_thunk)(1) +# res = collect(Context(), a) +# @test res == 2 +# @testset "self as input" begin +# a = delayed(dynamic_add_thunk_self_dominated)(1) +# @test_throws_unwrap Dagger.Sch.DynamicThunkException reason="Cannot fetch result of dominated thunk" collect(Context(), a) +# end +# end +# @testset "Fetch/Wait" begin +# @testset "multiple" begin +# a = delayed(dynamic_wait_fetch_multiple)(delayed(+)(1,2)) +# @test collect(Context(), a) == 3 +# end +# @testset "self" begin +# a = delayed(dynamic_fetch_self)(1) +# @test_throws_unwrap Dagger.Sch.DynamicThunkException reason="Cannot fetch own result" collect(Context(), a) +# end +# @testset "dominated" begin +# a = delayed(identity)(delayed(dynamic_fetch_dominated)(1)) +# @test_throws_unwrap Dagger.Sch.DynamicThunkException reason="Cannot fetch result of dominated thunk" collect(Context(), a) +# end +# end +# end + +# c1 = Dagger.tochunk(1) +# c2 = Dagger.tochunk(2) +# @everywhere begin +# function testpresent(x,y) +# @assert haskey(Dagger.Sch.CHUNK_CACHE, $c1) +# @assert haskey(Dagger.Sch.CHUNK_CACHE, $c2) +# x+y +# end +# function testevicted(x) +# sleep(1) +# @assert !haskey(Dagger.Sch.CHUNK_CACHE, $c1) +# @assert !haskey(Dagger.Sch.CHUNK_CACHE, $c2) +# x +# end +# end + +# @test_skip "Chunk Caching" +# #= +# @testset "Chunk Caching" begin +# compute(delayed(testevicted)(delayed(testpresent)(c1,c2))) +# end +# =# + +# @testset "MemPool.approx_size" begin +# for (obj, size) in [ +# (rand(100), 100*sizeof(Float64)), +# (rand(Float32, 100), 100*sizeof(Float32)), +# (rand(1:10, 100), 100*sizeof(Int)), +# (fill(:a, 10), missing), +# (fill("a", 10), missing), +# (fill('a', 10), missing), +# ] +# if size !== missing +# @test MemPool.approx_size(obj) == size +# else +# @test MemPool.approx_size(obj) !== nothing +# end +# end +# end