diff --git a/frontend/common/PlutoConnection.js b/frontend/common/PlutoConnection.js index 8cacfd617a..e6735b1bbe 100644 --- a/frontend/common/PlutoConnection.js +++ b/frontend/common/PlutoConnection.js @@ -108,6 +108,7 @@ const create_ws_connection = (address, { on_message, on_socket_close }, timeout_ const send_encoded = (message) => { const encoded = pack(message) + if (socket.readyState === WebSocket.CLOSED || socket.readyState === WebSocket.CLOSING) throw new Error("Socket is closed") socket.send(encoded) } @@ -266,7 +267,7 @@ const default_ws_address = () => ws_address_from_base(window.location.href) * * @param {{ * on_unrequested_update: (message: PlutoMessage, by_me: boolean) => void, - * on_reconnect: () => boolean, + * on_reconnect: () => Promise, * on_connection_status: (connection_status: boolean, hopeless: boolean) => void, * connect_metadata?: Object, * ws_address?: String, @@ -377,7 +378,7 @@ export const create_pluto_connection = async ({ await connect() // reconnect! console.log(`Starting state sync`, new Date().toLocaleTimeString()) - const accept = on_reconnect() + const accept = await on_reconnect() console.log(`State sync ${accept ? "" : "not "}successful`, new Date().toLocaleTimeString()) on_connection_status(accept, false) if (!accept) { diff --git a/frontend/components/Editor.js b/frontend/components/Editor.js index 85935512bd..b1a705eedd 100644 --- a/frontend/components/Editor.js +++ b/frontend/components/Editor.js @@ -861,7 +861,6 @@ patch: ${JSON.stringify( backend_launch_phase: this.state.backend_launch_phase == null ? null : BackendLaunchPhase.ready, }) - // TODO Do this from julia itself this.client.send("complete", { query: "sq" }, { notebook_id: this.state.notebook.notebook_id }) this.client.send("complete", { query: "\\sq" }, { notebook_id: this.state.notebook.notebook_id }) @@ -893,9 +892,18 @@ patch: ${JSON.stringify( } } - const on_reconnect = () => { + const on_reconnect = async () => { console.warn("Reconnected! Checking states") + await this.client.send( + "reset_shared_state", + {}, + { + notebook_id: this.state.notebook.notebook_id, + }, + false + ) + return true } diff --git a/frontend/components/welcome/Welcome.js b/frontend/components/welcome/Welcome.js index f836db7cd7..2f2c20acf8 100644 --- a/frontend/components/welcome/Welcome.js +++ b/frontend/components/welcome/Welcome.js @@ -68,7 +68,7 @@ export const Welcome = ({ launch_params }) => { const client_promise = create_pluto_connection({ on_unrequested_update: on_update, on_connection_status: on_connection_status, - on_reconnect: () => true, + on_reconnect: async () => true, ws_address: launch_params.pluto_server_url ? ws_address_from_base(launch_params.pluto_server_url) : undefined, }) client_promise.then(async (client) => { diff --git a/src/evaluation/Throttled.jl b/src/evaluation/Throttled.jl index ecb6b5c4d0..a481069f30 100644 --- a/src/evaluation/Throttled.jl +++ b/src/evaluation/Throttled.jl @@ -63,10 +63,11 @@ This throttle is 'leading' and has some other properties that are specifically d Inspired by FluxML See: https://github.com/FluxML/Flux.jl/blob/8afedcd6723112ff611555e350a8c84f4e1ad686/src/utils.jl#L662 """ -function throttled(f::Function, timeout::Real) +function throttled(f::Function, timeout::Real; runtime_multiplier::Float64=0.0) tlock = ReentrantLock() iscoolnow = Ref(false) run_later = Ref(false) + last_runtime = Ref(0.0) tf = ThrottledFunction(f, timeout, runtime_multiplier, tlock, iscoolnow, run_later, last_runtime) diff --git a/src/runner/PlutoRunner/src/PlutoRunner.jl b/src/runner/PlutoRunner/src/PlutoRunner.jl index 2a8cf72690..dbf52ac55d 100644 --- a/src/runner/PlutoRunner/src/PlutoRunner.jl +++ b/src/runner/PlutoRunner/src/PlutoRunner.jl @@ -17,6 +17,10 @@ # loaded on a runner process, which is something that we might want to change # in the future. +# DEVELOPMENT TIP +# If you are editing this file, you cannot use Revise unfortunately. +# However! You don't need to restart Pluto to test your changes! You just need to restart the notebook from the Pluto main menu, and the new PlutoRunner.jl will be loaded. + module PlutoRunner # import these two so that they can be imported from Main on the worker process if it launches without the stdlibs in its LOAD_PATH diff --git a/src/webserver/WebServer.jl b/src/webserver/WebServer.jl index a632fc685a..dbbfded50d 100644 --- a/src/webserver/WebServer.jl +++ b/src/webserver/WebServer.jl @@ -206,6 +206,7 @@ function run!(session::ServerSession) if HTTP.WebSockets.isclosed(clientstream) return end + found_client_id_ref = Ref(Symbol(:none)) try for message in clientstream # This stream contains data received over the WebSocket. @@ -221,6 +222,9 @@ function run!(session::ServerSession) end did_read = true + if found_client_id_ref[] === :none + found_client_id_ref[] = Symbol(parentbody["client_id"]) + end process_ws_message(session, parentbody, clientstream) catch ex if ex isa InterruptException || ex isa HTTP.WebSockets.WebSocketError || ex isa EOFError @@ -242,6 +246,11 @@ function run!(session::ServerSession) bt = stacktrace(catch_backtrace()) @warn "Reading WebSocket client stream failed for unknown reason:" exception = (ex, bt) end + finally + if haskey(session.connected_clients, found_client_id_ref[]) + @debug "Removing client $(found_client_id_ref[]) from connected_clients" + delete!(session.connected_clients, found_client_id_ref[]) + end end end catch ex @@ -379,7 +388,7 @@ end "All messages sent over the WebSocket get decoded+deserialized and end up here." function process_ws_message(session::ServerSession, parentbody::Dict, clientstream) client_id = Symbol(parentbody["client_id"]) - client = get!(session.connected_clients, client_id ) do + client = get!(session.connected_clients, client_id) do ClientSession(client_id, clientstream, session.options.server.simulated_lag) end client.stream = clientstream # it might change when the same client reconnects diff --git a/test/Throttled.jl b/test/Throttled.jl index 385ff61b43..b04c9e2732 100644 --- a/test/Throttled.jl +++ b/test/Throttled.jl @@ -20,9 +20,14 @@ using Pluto.WorkspaceManager: poll # we have an initial cooldown period in which f should not fire... # ...so x is still 1... @test x[] == 1 - sleep(2dt) + sleep(1.5dt) # ...but after a delay, the call should go through. @test x[] == 2 + + # Let's wait for the cooldown period to end + sleep(dt) + # nothing should have changed + @test x[] == 2 # sleep(0) ## ASYNC MAGIC :(