Skip to content

Commit

Permalink
Performance: state update throttle rework (#2979)
Browse files Browse the repository at this point in the history
  • Loading branch information
fonsp authored Aug 6, 2024
1 parent 31569ff commit 6e21828
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/evaluation/Run.jl
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ function run_reactive_core!(

# Send intermediate updates to the clients at most 20 times / second during a reactive run. (The effective speed of a slider is still unbounded, because the last update is not throttled.)
# flush_send_notebook_changes_throttled,
send_notebook_changes_throttled, flush_notebook_changes = throttled(1.0 / 20) do
send_notebook_changes_throttled, flush_notebook_changes = throttled(1.0 / 20; runtime_multiplier=2.0) do
send_notebook_changes!(ClientRequest(; session, notebook))
end
send_notebook_changes_throttled()
Expand Down
16 changes: 11 additions & 5 deletions src/evaluation/Throttled.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import Base.Threads

"""
throttled(f::Function, timeout::Real)
Expand All @@ -12,27 +13,32 @@ This throttle is 'leading' and has some other properties that are specifically d
Inspired by FluxML
See: https://github.com/FluxML/Flux.jl/blob/8afedcd6723112ff611555e350a8c84f4e1ad686/src/utils.jl#L662
"""
function throttled(f::Function, timeout::Real)
function throttled(f::Function, timeout::Real; runtime_multiplier::Float64=0.0)
tlock = ReentrantLock()
iscoolnow = Ref(false)
run_later = Ref(false)
last_runtime = Ref(0.0)

function flush()
lock(tlock) do
run_later[] = false
f()
last_runtime[] = @elapsed result = f()
result
end
end

function schedule()
@async begin
sleep(timeout)
# if the last runtime was quite long, increase the sleep period to match.
Timer(timeout + last_runtime[] * runtime_multiplier) do _t
if run_later[]
flush()
schedule()
else
iscoolnow[] = true
end
iscoolnow[] = true
end
end
# we initialize hot, and start the cooldown period immediately
schedule()

function throttled_f()
Expand Down
2 changes: 1 addition & 1 deletion src/webserver/SessionActions.jl
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ function add(session::ServerSession, notebook::Notebook; run_async::Bool=true)
end
end

notebook.status_tree.update_listener_ref[] = first(throttled(1.0 / 20) do
notebook.status_tree.update_listener_ref[] = first(throttled(1.0 / 5; runtime_multiplier=2.0) do
# TODO: this throttle should be trailing
Pluto.send_notebook_changes!(Pluto.ClientRequest(; session, notebook))
end)
Expand Down
31 changes: 30 additions & 1 deletion test/Throttled.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import Pluto:throttled
using Pluto.WorkspaceManager: poll

@testset "Throttled" begin
x = Ref(0)
Expand All @@ -19,9 +20,14 @@ import Pluto:throttled
# we have an initial cooldown period in which f should not fire...
# ...so x is still 1...
@test x[] == 1
sleep(2dt)
sleep(1.5dt)
# ...but after a delay, the call should go through.
@test x[] == 2

# Let's wait for the cooldown period to end
sleep(dt)
# nothing should have changed
@test x[] == 2

# sleep(0) ## ASYNC MAGIC :(

Expand Down Expand Up @@ -81,4 +87,27 @@ import Pluto:throttled
sleep(2dt)
@test x[] == 13

####

ft()
@test poll(2dt, dt/60) do
x[] == 14
end
# immediately fire again, right after the last fire
ft()
ft()
# this should not do anything, because we are still in the cooldown period
@test x[] == 14
# not even after a little while
sleep(0.1dt)
@test x[] == 14

# but eventually, our call should get queued
sleep(dt)
@test x[] == 15
sleep(2dt)

####


end

0 comments on commit 6e21828

Please sign in to comment.