diff --git a/base/condition.jl b/base/condition.jl index fd771c9be346a..1636f691f2826 100644 --- a/base/condition.jl +++ b/base/condition.jl @@ -135,6 +135,7 @@ in line to wake up on `notify`. Otherwise, `wait` has first-in-first-out (FIFO) """ function wait(c::GenericCondition; first::Bool=false) ct = current_task() + record_cpu_time!(ct) _wait2(c, ct, first) token = unlockall(c.lock) try diff --git a/base/options.jl b/base/options.jl index f535c27d99122..7fb7f8322e154 100644 --- a/base/options.jl +++ b/base/options.jl @@ -60,6 +60,7 @@ struct JLOptions heap_size_hint::UInt64 trace_compile_timing::Int8 trim::Int8 + task_timing::Int8 end # This runs early in the sysimage != is not defined yet diff --git a/base/task.jl b/base/task.jl index f3a134f374421..8d3a18e3d647d 100644 --- a/base/task.jl +++ b/base/task.jl @@ -810,6 +810,12 @@ end # runtime system hook called when a task finishes function task_done_hook(t::Task) # `finish_task` sets `sigatomic` before entering this function + if t.is_timing_enabled + # user_time -task-finished-> wait_time + now = time_ns() + record_cpu_time!(t, now) + record_wall_time!(t, now) + end err = istaskfailed(t) result = task_result(t) handled = false @@ -972,7 +978,13 @@ function enq_work(t::Task) return t end -schedule(t::Task) = enq_work(t) +function schedule(t::Task) + # user_time -task-(re)scheduled-> wait_time + if t.is_timing_enabled && t.first_enqueued_at == 0 + t.first_enqueued_at = time_ns() + end + enq_work(t) +end """ schedule(t::Task, [val]; error=false) @@ -1018,6 +1030,10 @@ true function schedule(t::Task, @nospecialize(arg); error=false) # schedule a task to be (re)started with the given value or exception t._state === task_state_runnable || Base.error("schedule: Task not runnable") + # user_time -task-(re)scheduled-> wait_time + if t.is_timing_enabled && t.first_enqueued_at == 0 + t.first_enqueued_at = time_ns() + end if error q = t.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, t) setfield!(t, :result, arg) @@ -1039,6 +1055,7 @@ tasks. """ function yield() ct = current_task() + record_cpu_time!(ct) enq_work(ct) try wait() @@ -1060,6 +1077,7 @@ Throws a `ConcurrencyViolationError` if `t` is the currently running task. """ function yield(t::Task, @nospecialize(x=nothing)) current = current_task() + record_cpu_time!(current) t === current && throw(ConcurrencyViolationError("Cannot yield to currently running task!")) (t._state === task_state_runnable && t.queue === nothing) || throw(ConcurrencyViolationError("yield: Task not runnable")) t.result = x @@ -1084,6 +1102,9 @@ function yieldto(t::Task, @nospecialize(x=nothing)) elseif t._state === task_state_failed throw(t.result) end + if t.is_timing_enabled && t.first_enqueued_at == 0 + t.first_enqueued_at = time_ns() + end t.result = x set_next_task(t) return try_yieldto(identity) @@ -1097,6 +1118,12 @@ function try_yieldto(undo) rethrow() end ct = current_task() + # scheduler -task-started-> user + # scheduler -task-resumed-> user + if ct.is_timing_enabled + # @assert ct.last_dequeued_at == 0 + ct.last_dequeued_at = time_ns() + end if ct._isexception exc = ct.result ct.result = nothing @@ -1110,6 +1137,9 @@ end # yield to a task, throwing an exception in it function throwto(t::Task, @nospecialize exc) + if t.is_timing_enabled && t.first_enqueued_at == 0 + t.first_enqueued_at = time_ns() + end t.result = exc t._isexception = true set_next_task(t) @@ -1176,3 +1206,20 @@ if Sys.iswindows() else pause() = ccall(:pause, Cvoid, ()) end + +function record_cpu_time!(t::Task, stopped_at::UInt64=time_ns()) + if t.is_timing_enabled + @assert t.last_dequeued_at != 0 + t.cpu_time_ns += stopped_at - t.last_dequeued_at + t.last_dequeued_at = 0 + end + return t +end + +function record_wall_time!(t::Task, done_at::UInt64=time_ns()) + if t.is_timing_enabled + @assert t.first_enqueued_at != 0 + t.wall_time_ns = done_at - t.first_enqueued_at + end + return t +end diff --git a/base/timing.jl b/base/timing.jl index 1de3727756829..ab38f812a916e 100644 --- a/base/timing.jl +++ b/base/timing.jl @@ -86,6 +86,64 @@ end # total time spend in garbage collection, in nanoseconds gc_time_ns() = ccall(:jl_gc_total_hrtime, UInt64, ()) +""" + task_cpu_time_ns(t::Task) -> UInt64 + +Return the total nanoseconds that the task `t` has spent running. +This metric is only updated when the task yields or completes. +See also [`task_wall_time_ns`](@ref). + +Will be `UInt64(0)` if task timings are not enabled. +See [`Base.task_timing`](@ref). + +!!! note "This metric is from the Julia scheduler" + A task may be running on an OS thread that is descheduled by the OS + scheduler, this time still counts towards the metric. + +!!! compat "Julia 1.12" + This method was added in Julia 1.12. +""" +function task_cpu_time_ns(t::Task) + return t.cpu_time_ns +end + +""" + task_wall_time_ns(t::Task) -> UInt64 + +Return the total nanoseconds that the task `t` was runnable. +This is the time since the task entered the run queue until the time at which it completed, +or until the current time if the task has not yet completed. +See also [`task_cpu_time_ns`](@ref). + +Will be `UInt64(0)` if task timings are not enabled. +See [`Base.task_timing`](@ref). + +!!! compat "Julia 1.12" + This method was added in Julia 1.12. +""" +function task_wall_time_ns(t::Task) + # TODO: report up til current time if not done? too racy? + # return istaskdone(t) ? t.wall_time_ns : time_ns() - t.first_enqueued_at + return t.wall_time_ns +end + +""" + Base.task_timing(::Bool) + +Enable or disable the collection of per-task timing information. +Task created when Base.task_timing(true) is in effect will have [`task_cpu_time_ns`](@ref) +and [`task_wall_time_ns`](@ref) timing information available. +""" +function task_timing(b::Bool) + if b + ccall(:jl_task_timing_enable, Cvoid, ()) + else + # TODO: prevent decrementing the counter below zero + ccall(:jl_task_timing_disable, Cvoid, ()) + end + return nothing +end + """ Base.gc_live_bytes() diff --git a/doc/man/julia.1 b/doc/man/julia.1 index 56cb690d66eeb..c9c95bd80c9d4 100644 --- a/doc/man/julia.1 +++ b/doc/man/julia.1 @@ -294,6 +294,10 @@ If --trace-compile is enabled show how long each took to compile in ms --trace-dispatch={stderr|name} Print precompile statements for methods dispatched during execution or save to stderr or a path. +.TP +--task-timing +Enabled the collection of per-task timing metrics. + .TP -image-codegen Force generate code in imaging mode diff --git a/doc/src/manual/command-line-interface.md b/doc/src/manual/command-line-interface.md index 734d7031db5e8..60089d1eeed9c 100644 --- a/doc/src/manual/command-line-interface.md +++ b/doc/src/manual/command-line-interface.md @@ -217,6 +217,7 @@ The following is a complete list of command-line switches available when launchi |`--trace-compile={stderr\|name}` |Print precompile statements for methods compiled during execution or save to stderr or a path. Methods that were recompiled are printed in yellow or with a trailing comment if color is not supported| |`--trace-compile-timing` |If --trace-compile is enabled show how long each took to compile in ms| |`--trace-dispatch={stderr\|name}` |Print precompile statements for methods dispatched during execution or save to stderr or a path.| +|`--task-timing` |Enable tasks the collection of per-task timing metrics| |`--image-codegen` |Force generate code in imaging mode| |`--permalloc-pkgimg={yes\|no*}` |Copy the data section of package images into memory| |`--trim={no*\|safe\|unsafe\|unsafe-warn}` |Build a sysimage including only code provably reachable from methods marked by calling `entrypoint`. The three non-default options differ in how they handle dynamic call sites. In safe mode, such sites result in compile-time errors. In unsafe mode, such sites are allowed but the resulting binary might be missing needed code and can throw runtime errors. With unsafe-warn, such sites will trigger warnings at compile-time and might error at runtime.| diff --git a/src/init.c b/src/init.c index 413d4e8055e54..b5ec4f3bba23a 100644 --- a/src/init.c +++ b/src/init.c @@ -874,6 +874,9 @@ static NOINLINE void _finish_julia_init(JL_IMAGE_SEARCH rel, jl_ptls_t ptls, jl_ post_boot_hooks(); } + if (jl_options.task_timing) { + jl_atomic_fetch_add(&jl_task_timing_enabled, 1); + } if (jl_base_module == NULL) { // nthreads > 1 requires code in Base jl_atomic_store_relaxed(&jl_n_threads, 1); diff --git a/src/jlapi.c b/src/jlapi.c index a3621385a437e..d13ff0a7f763e 100644 --- a/src/jlapi.c +++ b/src/jlapi.c @@ -809,6 +809,33 @@ JL_DLLEXPORT uint64_t jl_cumulative_recompile_time_ns(void) return jl_atomic_load_relaxed(&jl_cumulative_recompile_time); } +/** + * @brief Enable per-task timing. + */ +JL_DLLEXPORT void jl_task_timing_enable(void) +{ + // Increment the flag to allow reentrant callers. + jl_atomic_fetch_add(&jl_task_timing_enabled, 1); +} + +/** + * @brief Disable per-task timing. + */ +JL_DLLEXPORT void jl_task_timing_disable(void) +{ + jl_atomic_fetch_add(&jl_task_timing_enabled, -1); +} + +// TODO: remove this +JL_DLLEXPORT void jl_is_task_timing_enabled(void) +{ + jl_atomic_load_relaxed(&jl_task_timing_enabled); +} +JL_DLLEXPORT void jl_is_compile_timing_enabled(void) +{ + jl_atomic_load_relaxed(&jl_measure_compile_time_enabled); +} + /** * @brief Retrieve floating-point environment constants. * diff --git a/src/jloptions.c b/src/jloptions.c index 35f0a76e3f6e7..747b854a3bb86 100644 --- a/src/jloptions.c +++ b/src/jloptions.c @@ -103,6 +103,7 @@ JL_DLLEXPORT void jl_init_options(void) 0, // heap-size-hint 0, // trace_compile_timing 0, // trim + 0, // task_timing }; jl_options_initialized = 1; } @@ -265,6 +266,7 @@ static const char opts_hidden[] = " comment if color is not supported\n" " --trace-compile-timing If --trace-compile is enabled show how long each took to\n" " compile in ms\n" + " --task-timing Enable collection of per-task timing data.\n" " --image-codegen Force generate code in imaging mode\n" " --permalloc-pkgimg={yes|no*} Copy the data section of package images into memory\n" " --trim={no*|safe|unsafe|unsafe-warn}\n" @@ -296,6 +298,7 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp) opt_trace_compile, opt_trace_compile_timing, opt_trace_dispatch, + opt_task_timing, opt_math_mode, opt_worker, opt_bind_to, @@ -375,6 +378,7 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp) { "trace-compile", required_argument, 0, opt_trace_compile }, { "trace-compile-timing", no_argument, 0, opt_trace_compile_timing }, { "trace-dispatch", required_argument, 0, opt_trace_dispatch }, + { "task-timing", no_argument, 0, opt_task_timing }, { "math-mode", required_argument, 0, opt_math_mode }, { "handle-signals", required_argument, 0, opt_handle_signals }, // hidden command line options @@ -964,6 +968,9 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp) else jl_errorf("julia: invalid argument to --trim={safe|no|unsafe|unsafe-warn} (%s)", optarg); break; + case opt_task_timing: + jl_options.task_timing = 1; + break; default: jl_errorf("julia: unhandled option -- %c\n" "This is a bug, please report it.", c); diff --git a/src/jloptions.h b/src/jloptions.h index e58797caace3c..1eabc195b8d3e 100644 --- a/src/jloptions.h +++ b/src/jloptions.h @@ -64,6 +64,7 @@ typedef struct { uint64_t heap_size_hint; int8_t trace_compile_timing; int8_t trim; + int8_t task_timing; } jl_options_t; #endif diff --git a/src/jltypes.c b/src/jltypes.c index 11f1d11a14edc..9053689960a94 100644 --- a/src/jltypes.c +++ b/src/jltypes.c @@ -3744,7 +3744,7 @@ void jl_init_types(void) JL_GC_DISABLED NULL, jl_any_type, jl_emptysvec, - jl_perm_symsvec(16, + jl_perm_symsvec(21, "next", "queue", "storage", @@ -3760,8 +3760,13 @@ void jl_init_types(void) JL_GC_DISABLED "_state", "sticky", "_isexception", - "priority"), - jl_svec(16, + "priority", + "is_timing_enabled", + "first_enqueued_at", + "last_dequeued_at", + "cpu_time_ns", + "wall_time_ns"), + jl_svec(21, jl_any_type, jl_any_type, jl_any_type, @@ -3777,7 +3782,12 @@ void jl_init_types(void) JL_GC_DISABLED jl_uint8_type, jl_bool_type, jl_bool_type, - jl_uint16_type), + jl_uint16_type, + jl_bool_type, + jl_uint64_type, + jl_uint64_type, + jl_uint64_type, + jl_uint64_type), jl_emptysvec, 0, 1, 6); XX(task); diff --git a/src/julia.h b/src/julia.h index 1d36dba519700..1b0c6f44085d3 100644 --- a/src/julia.h +++ b/src/julia.h @@ -2289,6 +2289,16 @@ typedef struct _jl_task_t { // uint8_t padding1; // multiqueue priority uint16_t priority; + // flag indicating whether or not this to measure the wall and cpu time of this task + uint8_t is_timing_enabled; + // timestamp this task first entered the run queue (TODO: int32 of ms instead?) + uint64_t first_enqueued_at; + // timestamp this task was most recently scheduled to run + uint64_t last_dequeued_at; + // time this task has spent running; updated when it yields + uint64_t cpu_time_ns; + // time between first entering the run queue and being done/failed. + uint64_t wall_time_ns; // hidden state: // cached floating point environment diff --git a/src/julia_internal.h b/src/julia_internal.h index 8c4ee9fca36e0..07c593e4ccb03 100644 --- a/src/julia_internal.h +++ b/src/julia_internal.h @@ -287,6 +287,9 @@ extern JL_DLLEXPORT _Atomic(uint8_t) jl_measure_compile_time_enabled; extern JL_DLLEXPORT _Atomic(uint64_t) jl_cumulative_compile_time; extern JL_DLLEXPORT _Atomic(uint64_t) jl_cumulative_recompile_time; +// Global *atomic* integer controlling *process-wide* task timing. +extern JL_DLLEXPORT _Atomic(uint8_t) jl_task_timing_enabled; + #define jl_return_address() ((uintptr_t)__builtin_return_address(0)) STATIC_INLINE uint32_t jl_int32hash_fast(uint32_t a) diff --git a/src/task.c b/src/task.c index 5e1172a96a409..0698175d2a5b4 100644 --- a/src/task.c +++ b/src/task.c @@ -1146,6 +1146,11 @@ JL_DLLEXPORT jl_task_t *jl_new_task(jl_function_t *start, jl_value_t *completion t->ptls = NULL; t->world_age = ct->world_age; t->reentrant_timing = 0; + t->is_timing_enabled = jl_atomic_load_relaxed(&jl_task_timing_enabled) != 0; + t->first_enqueued_at = 0; + t->last_dequeued_at = 0; + t->cpu_time_ns = 0; + t->wall_time_ns = 0; jl_timing_task_init(t); if (t->ctx.copy_stack) @@ -1245,6 +1250,12 @@ CFI_NORETURN fesetenv(&ct->fenv); ct->ctx.started = 1; + // wait_time -task-started-> user_time + if (ct->is_timing_enabled) { + assert(ct->first_enqueued_at != 0); + assert(ct->last_dequeued_at == 0); + ct->last_dequeued_at = jl_hrtime(); + } JL_PROBE_RT_START_TASK(ct); jl_timing_block_task_enter(ct, ptls, NULL); if (jl_atomic_load_relaxed(&ct->_isexception)) { @@ -1596,6 +1607,17 @@ jl_task_t *jl_init_root_task(jl_ptls_t ptls, void *stack_lo, void *stack_hi) ct->ptls = ptls; ct->world_age = 1; // OK to run Julia code on this task ct->reentrant_timing = 0; + ct->cpu_time_ns = 0; + ct->wall_time_ns = 0; + if (ct->is_timing_enabled) { + uint64_t now = jl_hrtime(); + ct->first_enqueued_at = now; + ct->last_dequeued_at = now; + } + else { + ct->first_enqueued_at = 0; + ct->last_dequeued_at = 0; + } ptls->root_task = ct; jl_atomic_store_relaxed(&ptls->current_task, ct); JL_GC_PROMISE_ROOTED(ct); diff --git a/src/threading.c b/src/threading.c index 50944a24eb29b..2700e3c9387ea 100644 --- a/src/threading.c +++ b/src/threading.c @@ -49,6 +49,8 @@ JL_DLLEXPORT _Atomic(uint8_t) jl_measure_compile_time_enabled = 0; JL_DLLEXPORT _Atomic(uint64_t) jl_cumulative_compile_time = 0; JL_DLLEXPORT _Atomic(uint64_t) jl_cumulative_recompile_time = 0; +JL_DLLEXPORT _Atomic(uint8_t) jl_task_timing_enabled = 0; + JL_DLLEXPORT void *jl_get_ptls_states(void) { // mostly deprecated: use current_task instead diff --git a/test/cmdlineargs.jl b/test/cmdlineargs.jl index cc3f8950f0dc0..81e00dea1d561 100644 --- a/test/cmdlineargs.jl +++ b/test/cmdlineargs.jl @@ -783,6 +783,17 @@ let exename = `$(Base.julia_cmd()) --startup-file=no --color=no` "Int(Base.JLOptions().fast_math)"`)) == JL_OPTIONS_FAST_MATH_DEFAULT end + let JL_OPTIONS_TASK_TIMING_OFF = 0, JL_OPTIONS_TASK_TIMING_ON = 1 + @test parse(Int,readchomp(`$exename -E + "Int(Base.JLOptions().task_timing)"`)) == JL_OPTIONS_TASK_TIMING_OFF + @test parse(Int, readchomp(`$exename --task-timing -E + "Int(Base.JLOptions().task_timing)"`)) == JL_OPTIONS_TASK_TIMING_ON + @test !parse(Bool, readchomp(`$exename -E + "fetch(Threads.@spawn current_task().is_timing_enabled)"`)) + @test parse(Bool, readchomp(`$exename --task-timing -E + "fetch(Threads.@spawn current_task().is_timing_enabled)"`)) + end + # --worker takes default / custom as argument (default/custom arguments # tested in test/parallel.jl) @test errors_not_signals(`$exename --worker=true`) diff --git a/test/threads_exec.jl b/test/threads_exec.jl index ac54dd009390c..d51cc409b01b6 100644 --- a/test/threads_exec.jl +++ b/test/threads_exec.jl @@ -3,6 +3,7 @@ using Test using Base.Threads using Base.Threads: SpinLock, threadpoolsize +using LinearAlgebra: peakflops # for cfunction_closure include("testenv.jl") @@ -1312,4 +1313,80 @@ end end end end + +@testset "task time counters" begin + @testset "enabled" begin + try + Base.task_timing(true) + start_time = time_ns() + t = Threads.@spawn peakflops() + wait(t) + end_time = time_ns() + wall_time_delta = end_time - start_time + @test t.is_timing_enabled + @test Base.task_cpu_time_ns(t) > 0 + @test Base.task_wall_time_ns(t) > 0 + @test Base.task_wall_time_ns(t) >= Base.task_cpu_time_ns(t) + @test wall_time_delta > Base.task_wall_time_ns(t) + finally + Base.task_timing(false) + end + end + @testset "disabled" begin + t = Threads.@spawn peakflops() + wait(t) + @test !t.is_timing_enabled + @test Base.task_cpu_time_ns(t) == 0 + @test Base.task_wall_time_ns(t) == 0 + end +end + +@testset "task time counters: lots of spawns" begin + using Dates + try + Base.task_timing(true) + # create more tasks than we have threads. + # - all tasks must have: cpu time <= wall time + # - some tasks must have: cpu time < wall time + # - summing across all tasks we must have: total cpu time <= available cpu time + n_tasks = 2 * Threads.nthreads(:default) + cpu_times = Vector{UInt64}(undef, n_tasks) + wall_times = Vector{UInt64}(undef, n_tasks) + start_time = time_ns() + @sync begin + for i in 1:n_tasks + start_time_i = time_ns() + task_i = Threads.@spawn peakflops() + Threads.@spawn begin + wait(task_i) + end_time_i = time_ns() + wall_time_delta_i = end_time_i - start_time_i + cpu_times[$i] = cpu_time_i = Base.task_cpu_time_ns(task_i) + wall_times[$i] = wall_time_i = Base.task_wall_time_ns(task_i) + # task should have recorded some cpu-time and some wall-time + @test cpu_time_i > 0 + @test wall_time_i > 0 + # task cpu-time cannot be greater than its wall-time + @test wall_time_i >= cpu_time_i + # task wall-time must be less than our manually measured wall-time + # between calling `@spawn` and returning from `wait`. + @test wall_time_delta_i > wall_time_i + end + end + end + end_time = time_ns() + wall_time_delta = (end_time - start_time) + available_cpu_time = wall_time_delta * Threads.nthreads(:default) + summed_cpu_time = sum(cpu_times) + # total CPU time from all tasks can't exceed what was actually available. + @test available_cpu_time > summed_cpu_time + # some tasks must have cpu-time less than their wall-time, because we had more tasks + # than threads. + summed_wall_time = sum(wall_times) + @test summed_wall_time > summed_cpu_time + finally + Base.task_timing(false) + end +end + end # main testset