Skip to content

Commit

Permalink
Merge branch 'main' into update-throttle-rework-2
Browse files Browse the repository at this point in the history
  • Loading branch information
fonsp committed Aug 6, 2024
2 parents ab4c1a6 + 6e21828 commit e6bd26f
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 8 deletions.
5 changes: 3 additions & 2 deletions frontend/common/PlutoConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ const create_ws_connection = (address, { on_message, on_socket_close }, timeout_

const send_encoded = (message) => {
const encoded = pack(message)
if (socket.readyState === WebSocket.CLOSED || socket.readyState === WebSocket.CLOSING) throw new Error("Socket is closed")
socket.send(encoded)
}

Expand Down Expand Up @@ -266,7 +267,7 @@ const default_ws_address = () => ws_address_from_base(window.location.href)
*
* @param {{
* on_unrequested_update: (message: PlutoMessage, by_me: boolean) => void,
* on_reconnect: () => boolean,
* on_reconnect: () => Promise<boolean>,
* on_connection_status: (connection_status: boolean, hopeless: boolean) => void,
* connect_metadata?: Object,
* ws_address?: String,
Expand Down Expand Up @@ -377,7 +378,7 @@ export const create_pluto_connection = async ({
await connect() // reconnect!

console.log(`Starting state sync`, new Date().toLocaleTimeString())
const accept = on_reconnect()
const accept = await on_reconnect()
console.log(`State sync ${accept ? "" : "not "}successful`, new Date().toLocaleTimeString())
on_connection_status(accept, false)
if (!accept) {
Expand Down
12 changes: 10 additions & 2 deletions frontend/components/Editor.js
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,6 @@ patch: ${JSON.stringify(
backend_launch_phase: this.state.backend_launch_phase == null ? null : BackendLaunchPhase.ready,
})

// TODO Do this from julia itself
this.client.send("complete", { query: "sq" }, { notebook_id: this.state.notebook.notebook_id })
this.client.send("complete", { query: "\\sq" }, { notebook_id: this.state.notebook.notebook_id })

Expand Down Expand Up @@ -893,9 +892,18 @@ patch: ${JSON.stringify(
}
}

const on_reconnect = () => {
const on_reconnect = async () => {
console.warn("Reconnected! Checking states")

await this.client.send(
"reset_shared_state",
{},
{
notebook_id: this.state.notebook.notebook_id,
},
false
)

return true
}

Expand Down
2 changes: 1 addition & 1 deletion frontend/components/welcome/Welcome.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export const Welcome = ({ launch_params }) => {
const client_promise = create_pluto_connection({
on_unrequested_update: on_update,
on_connection_status: on_connection_status,
on_reconnect: () => true,
on_reconnect: async () => true,
ws_address: launch_params.pluto_server_url ? ws_address_from_base(launch_params.pluto_server_url) : undefined,
})
client_promise.then(async (client) => {
Expand Down
3 changes: 2 additions & 1 deletion src/evaluation/Throttled.jl
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ This throttle is 'leading' and has some other properties that are specifically d
Inspired by FluxML
See: https://github.com/FluxML/Flux.jl/blob/8afedcd6723112ff611555e350a8c84f4e1ad686/src/utils.jl#L662
"""
function throttled(f::Function, timeout::Real)
function throttled(f::Function, timeout::Real; runtime_multiplier::Float64=0.0)
tlock = ReentrantLock()
iscoolnow = Ref(false)
run_later = Ref(false)
last_runtime = Ref(0.0)

tf = ThrottledFunction(f, timeout, runtime_multiplier, tlock, iscoolnow, run_later, last_runtime)

Expand Down
4 changes: 4 additions & 0 deletions src/runner/PlutoRunner/src/PlutoRunner.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
# loaded on a runner process, which is something that we might want to change
# in the future.

# DEVELOPMENT TIP
# If you are editing this file, you cannot use Revise unfortunately.
# However! You don't need to restart Pluto to test your changes! You just need to restart the notebook from the Pluto main menu, and the new PlutoRunner.jl will be loaded.

module PlutoRunner

# import these two so that they can be imported from Main on the worker process if it launches without the stdlibs in its LOAD_PATH
Expand Down
11 changes: 10 additions & 1 deletion src/webserver/WebServer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ function run!(session::ServerSession)
if HTTP.WebSockets.isclosed(clientstream)
return
end
found_client_id_ref = Ref(Symbol(:none))
try
for message in clientstream
# This stream contains data received over the WebSocket.
Expand All @@ -221,6 +222,9 @@ function run!(session::ServerSession)
end

did_read = true
if found_client_id_ref[] === :none
found_client_id_ref[] = Symbol(parentbody["client_id"])
end
process_ws_message(session, parentbody, clientstream)
catch ex
if ex isa InterruptException || ex isa HTTP.WebSockets.WebSocketError || ex isa EOFError
Expand All @@ -242,6 +246,11 @@ function run!(session::ServerSession)
bt = stacktrace(catch_backtrace())
@warn "Reading WebSocket client stream failed for unknown reason:" exception = (ex, bt)
end
finally
if haskey(session.connected_clients, found_client_id_ref[])
@debug "Removing client $(found_client_id_ref[]) from connected_clients"
delete!(session.connected_clients, found_client_id_ref[])
end
end
end
catch ex
Expand Down Expand Up @@ -379,7 +388,7 @@ end
"All messages sent over the WebSocket get decoded+deserialized and end up here."
function process_ws_message(session::ServerSession, parentbody::Dict, clientstream)
client_id = Symbol(parentbody["client_id"])
client = get!(session.connected_clients, client_id ) do
client = get!(session.connected_clients, client_id) do
ClientSession(client_id, clientstream, session.options.server.simulated_lag)
end
client.stream = clientstream # it might change when the same client reconnects
Expand Down
7 changes: 6 additions & 1 deletion test/Throttled.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ using Pluto.WorkspaceManager: poll
# we have an initial cooldown period in which f should not fire...
# ...so x is still 1...
@test x[] == 1
sleep(2dt)
sleep(1.5dt)
# ...but after a delay, the call should go through.
@test x[] == 2

# Let's wait for the cooldown period to end
sleep(dt)
# nothing should have changed
@test x[] == 2

# sleep(0) ## ASYNC MAGIC :(

Expand Down

0 comments on commit e6bd26f

Please sign in to comment.