Skip to content

Commit

Permalink
whitespace
Browse files Browse the repository at this point in the history
from #2249

Co-Authored-By: Paul Berg <[email protected]>
  • Loading branch information
fonsp and Pangoraw committed Sep 21, 2022
1 parent ae47b70 commit b9158e0
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 70 deletions.
124 changes: 62 additions & 62 deletions src/evaluation/WorkspaceManager.jl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ function make_workspace((session, notebook)::SN; is_offline_renderer::Bool=false

pid = if use_distributed
@debug "Creating workspace process" notebook.path length(notebook.cells)
create_workspaceprocess(;compiler_options=_merge_notebook_compiler_options(notebook, session.options.compiler))
create_workspaceprocess(; compiler_options=_merge_notebook_compiler_options(notebook, session.options.compiler))
else
pid = Distributed.myid()
if !(isdefined(Main, :PlutoRunner) && Main.PlutoRunner isa Module)
Expand All @@ -61,7 +61,7 @@ function make_workspace((session, notebook)::SN; is_offline_renderer::Bool=false
end

Distributed.remotecall_eval(Main, [pid], :(PlutoRunner.notebook_id[] = $(notebook.notebook_id)))

remote_log_channel = Core.eval(Main, quote
$(Distributed).RemoteChannel(() -> eval(quote

Expand All @@ -75,18 +75,18 @@ function make_workspace((session, notebook)::SN; is_offline_renderer::Bool=false
channel
end), $pid)
end)

run_channel = Core.eval(Main, quote
$(Distributed).RemoteChannel(() -> eval(:(Main.PlutoRunner.run_channel)), $pid)
end)
module_name = create_emptyworkspacemodule(pid)

original_LOAD_PATH, original_ACTIVE_PROJECT = Distributed.remotecall_eval(Main, pid, :(Base.LOAD_PATH, Base.ACTIVE_PROJECT[]))

workspace = Workspace(;
pid,
notebook_id=notebook.notebook_id,
remote_log_channel,
remote_log_channel,
module_name,
original_LOAD_PATH,
original_ACTIVE_PROJECT,
Expand All @@ -109,17 +109,17 @@ function use_nbpkg_environment((session, notebook)::SN, workspace=nothing)
if workspace.nbpkg_was_active == enabled
return
end

workspace = workspace !== nothing ? workspace : get_workspace((session, notebook))
if workspace.discarded
return
end

workspace.nbpkg_was_active = enabled
if workspace.pid != Distributed.myid()
new_LP = enabled ? ["@", "@stdlib"] : workspace.original_LOAD_PATH
new_AP = enabled ? PkgCompat.env_dir(notebook.nbpkg_ctx) : workspace.original_ACTIVE_PROJECT

Distributed.remotecall_eval(Main, [workspace.pid], quote
copy!(LOAD_PATH, $(new_LP))
Base.ACTIVE_PROJECT[] = $(new_AP)
Expand All @@ -146,10 +146,10 @@ function start_relaying_self_updates((session, notebook)::SN, run_channel::Distr
end

function start_relaying_logs((session, notebook)::SN, log_channel::Distributed.RemoteChannel)
update_throttled, flush_throttled = Pluto.throttled(0.1) do
update_throttled, flush_throttled = Pluto.throttled(0.1) do
Pluto.send_notebook_changes!(Pluto.ClientRequest(session=session, notebook=notebook))
end

while true
try
next_log::Dict{String,Any} = take!(log_channel)
Expand All @@ -164,32 +164,32 @@ function start_relaying_logs((session, notebook)::SN, log_channel::Distributed.R
begin
source_cell_id = if match !== nothing
# the log originated from within the notebook

UUID(fn[match[end]+1:end])
else
# the log originated from a function call defined outside of the notebook

# we will show the log at the currently running cell, at "line -1", i.e. without line info.
next_log["line"] = -1
running_cell_id
end

if running_cell_id != source_cell_id
# the log originated from a function in another cell of the notebook
# we will show the log at the currently running cell, at "line -1", i.e. without line info.
next_log["line"] = -1
end
end

source_cell = get(notebook.cells_dict, source_cell_id, nothing)
running_cell = get(notebook.cells_dict, running_cell_id, nothing)

display_cell = if running_cell === nothing || (source_cell !== nothing && source_cell.output.has_pluto_hook_features)
source_cell
else
running_cell
end

@assert !isnothing(display_cell)

maybe_max_log = findfirst(((key, _),) -> key == "maxlog", next_log["kwargs"])
Expand Down Expand Up @@ -257,7 +257,7 @@ const Distributed_expr = :(
# NOTE: this function only start a worker process using given
# compiler options, it does not resolve paths for notebooks
# compiler configurations passed to it should be resolved before this
function create_workspaceprocess(;compiler_options=CompilerOptions())::Integer
function create_workspaceprocess(; compiler_options=CompilerOptions())::Integer
# run on proc 1 in case Pluto is being used inside a notebook process
# Workaround for "only process 1 can add/remove workers"
pid = Distributed.remotecall_eval(Main, 1, quote
Expand Down Expand Up @@ -288,15 +288,15 @@ function get_workspace(session_notebook::SN; allow_creation::Bool=true)::Union{N
@debug "This should not happen" notebook.process_status
error("Cannot run code in this notebook: it has already shut down.")
end
task = !allow_creation ?
get(workspaces, notebook.notebook_id, nothing) :
get!(workspaces, notebook.notebook_id) do
Task(() -> make_workspace(session_notebook))
end

task = !allow_creation ?
get(workspaces, notebook.notebook_id, nothing) :
get!(workspaces, notebook.notebook_id) do
Task(() -> make_workspace(session_notebook))
end

isnothing(task) && return nothing

istaskstarted(task) || schedule(task)
fetch(task)
end
Expand Down Expand Up @@ -349,8 +349,8 @@ function distributed_exception_result(exs::CompositeException, workspace::Worksp
ex = exs.exceptions |> first

if ex isa Distributed.RemoteException &&
ex.pid == workspace.pid &&
ex.captured.ex isa InterruptException
ex.pid == workspace.pid &&
ex.captured.ex isa InterruptException

(
output_formatted=PlutoRunner.format_output(CapturedException(InterruptException(), [])),
Expand Down Expand Up @@ -390,19 +390,19 @@ end
`expr` has to satisfy `ExpressionExplorer.is_toplevel_expr`."
function eval_format_fetch_in_workspace(
session_notebook::Union{SN,Workspace},
expr::Expr,
cell_id::UUID;
ends_with_semicolon::Bool=false,
function_wrapped_info::Union{Nothing,Tuple}=nothing,
forced_expr_id::Union{PlutoRunner.ObjectID,Nothing}=nothing,
known_published_objects::Vector{String}=String[],
user_requested_run::Bool=true,
capture_stdout::Bool=true,
)::PlutoRunner.FormattedCellResult
session_notebook::Union{SN,Workspace},
expr::Expr,
cell_id::UUID;
ends_with_semicolon::Bool=false,
function_wrapped_info::Union{Nothing,Tuple}=nothing,
forced_expr_id::Union{PlutoRunner.ObjectID,Nothing}=nothing,
known_published_objects::Vector{String}=String[],
user_requested_run::Bool=true,
capture_stdout::Bool=true
)::PlutoRunner.FormattedCellResult

workspace = get_workspace(session_notebook)

is_on_this_process = workspace.pid == Distributed.myid()

# if multiple notebooks run on the same process, then we need to `cd` between the different notebook paths
Expand All @@ -412,16 +412,16 @@ function eval_format_fetch_in_workspace(
end
use_nbpkg_environment(session_notebook, workspace)
end

# run the code 🏃‍♀️

# a try block (on this process) to catch an InterruptException
take!(workspace.dowork_token)
early_result = try
# we use [pid] instead of pid to prevent fetching output
Distributed.remotecall_eval(Main, [workspace.pid], :(PlutoRunner.run_expression(
getfield(Main, $(QuoteNode(workspace.module_name))),
$(QuoteNode(expr)),
getfield(Main, $(QuoteNode(workspace.module_name))),
$(QuoteNode(expr)),
$(workspace.notebook_id),
$cell_id,
$function_wrapped_info,
Expand All @@ -438,38 +438,38 @@ function eval_format_fetch_in_workspace(
end

early_result === nothing ?
format_fetch_in_workspace(workspace, cell_id, ends_with_semicolon, known_published_objects) :
early_result
format_fetch_in_workspace(workspace, cell_id, ends_with_semicolon, known_published_objects) :
early_result
end

"Evaluate expression inside the workspace - output is not fetched, errors are rethrown. For internal use."
function eval_in_workspace(session_notebook::Union{SN,Workspace}, expr)
workspace = get_workspace(session_notebook)

Distributed.remotecall_eval(Main, [workspace.pid], :(Core.eval($(workspace.module_name), $(expr |> QuoteNode))))
nothing
end

function format_fetch_in_workspace(
session_notebook::Union{SN,Workspace},
cell_id,
ends_with_semicolon,
known_published_objects::Vector{String}=String[],
showmore_id::Union{PlutoRunner.ObjectDimPair,Nothing}=nothing,
)::PlutoRunner.FormattedCellResult
session_notebook::Union{SN,Workspace},
cell_id,
ends_with_semicolon,
known_published_objects::Vector{String}=String[],
showmore_id::Union{PlutoRunner.ObjectDimPair,Nothing}=nothing,
)::PlutoRunner.FormattedCellResult
workspace = get_workspace(session_notebook)

# instead of fetching the output value (which might not make sense in our context, since the user can define structs, types, functions, etc), we format the cell output on the worker, and fetch the formatted output.
withtoken(workspace.dowork_token) do
try
Distributed.remotecall_eval(Main, workspace.pid, :(PlutoRunner.formatted_result_of(
$(workspace.notebook_id),
$cell_id,
$ends_with_semicolon,
$cell_id,
$ends_with_semicolon,
$known_published_objects,
$showmore_id,
getfield(Main, $(QuoteNode(workspace.module_name))),
)))
)))
catch ex
distributed_exception_result(CompositeException([ex]), workspace)
end
Expand All @@ -488,13 +488,13 @@ function collect_soft_definitions(session_notebook::SN, modules::Set{Expr})
end


function macroexpand_in_workspace(session_notebook::Union{SN,Workspace}, macrocall, cell_uuid, module_name = nothing)::Tuple{Bool, Any}
function macroexpand_in_workspace(session_notebook::Union{SN,Workspace}, macrocall, cell_id, module_name=nothing)::Tuple{Bool,Any}
workspace = get_workspace(session_notebook)
module_name = module_name === nothing ? workspace.module_name : module_name

Distributed.remotecall_eval(Main, workspace.pid, quote
try
(true, PlutoRunner.try_macroexpand($(module_name), $(cell_uuid), $(macrocall |> QuoteNode)))
(true, PlutoRunner.try_macroexpand($(module_name), $(workspace.notebook_id), $(cell_id), $(macrocall |> QuoteNode)))
catch error
# We have to be careful here, for example a thrown `MethodError()` will contain the called method and arguments.
# which normally would be very useful for debugging, but we can't serialize it!
Expand All @@ -511,7 +511,7 @@ end
"Evaluate expression inside the workspace - output is returned. For internal use."
function eval_fetch_in_workspace(session_notebook::Union{SN,Workspace}, expr)
workspace = get_workspace(session_notebook)

Distributed.remotecall_eval(Main, workspace.pid, :(Core.eval($(workspace.module_name), $(expr |> QuoteNode))))
end

Expand All @@ -525,12 +525,12 @@ end
function move_vars(session_notebook::Union{SN,Workspace}, old_workspace_name::Symbol, new_workspace_name::Union{Nothing,Symbol}, to_delete::Set{Symbol}, methods_to_delete::Set{Tuple{UUID,FunctionName}}, module_imports_to_move::Set{Expr}, invalidated_cell_uuids::Set{UUID}; kwargs...)
workspace = get_workspace(session_notebook)
new_workspace_name = something(new_workspace_name, workspace.module_name)

Distributed.remotecall_eval(Main, [workspace.pid], :(PlutoRunner.move_vars($(old_workspace_name |> QuoteNode), $(new_workspace_name |> QuoteNode), $to_delete, $methods_to_delete, $module_imports_to_move, $invalidated_cell_uuids)))
end

move_vars(session_notebook::Union{SN,Workspace}, to_delete::Set{Symbol}, methods_to_delete::Set{Tuple{UUID,FunctionName}}, module_imports_to_move::Set{Expr}, invalidated_cell_uuids::Set{UUID}; kwargs...) =
move_vars(session_notebook, bump_workspace_module(session_notebook)..., to_delete, methods_to_delete, module_imports_to_move, invalidated_cell_uuids; kwargs...)
move_vars(session_notebook, bump_workspace_module(session_notebook)..., to_delete, methods_to_delete, module_imports_to_move, invalidated_cell_uuids; kwargs...)

# TODO: delete me
@deprecate(
Expand Down Expand Up @@ -586,7 +586,7 @@ end
"Force interrupt (SIGINT) a workspace, return whether successful"
function interrupt_workspace(session_notebook::Union{SN,Workspace}; verbose=true)::Bool
workspace = get_workspace(session_notebook; allow_creation=false)

if !(workspace isa Workspace)
# verbose && @info "Can't interrupt this notebook: it is not running."
return false
Expand Down Expand Up @@ -626,7 +626,7 @@ function interrupt_workspace(session_notebook::Union{SN,Workspace}; verbose=true
end

verbose && println("Still running... starting sequence")
while !isready(workspace.dowork_token)
while !isready(workspace.dowork_token)
for _ in 1:5
verbose && print(" 🔥 ")
Distributed.interrupt(workspace.pid)
Expand Down
16 changes: 8 additions & 8 deletions src/runner/PlutoRunner.jl
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,10 @@ module CantReturnInPluto
replace_returns_with_error_in_interpolation(ex) = ex
end

function try_macroexpand(mod, cell_uuid, expr)
function try_macroexpand(mod::Module, notebook_id::UUID, cell_id::UUID, expr)
# Remove the precvious cached expansion, so when we error somewhere before we update,
# the old one won't linger around and get run accidentally.
delete!(cell_expanded_exprs, cell_uuid)
delete!(cell_expanded_exprs, cell_id)

# Remove toplevel block, as that screws with the computer and everything
expr_not_toplevel = if expr.head == :toplevel || expr.head == :block
Expand All @@ -288,14 +288,14 @@ function try_macroexpand(mod, cell_uuid, expr)
expr_without_globalrefs = globalref_to_workspaceref(expr_without_return)

has_pluto_hook_features = has_hook_style_pluto_properties_in_expr(expr_without_globalrefs)
expr_to_save = replace_pluto_properties_in_expr(expr_without_globalrefs,
cell_id=cell_uuid,
rerun_cell_function=() -> rerun_cell_from_notebook(cell_uuid),
register_cleanup_function=(fn) -> UseEffectCleanups.register_cleanup(fn, cell_uuid),
expr_to_save = replace_pluto_properties_in_expr(expr_without_globalrefs;
cell_id,
rerun_cell_function=() -> rerun_cell_from_notebook(cell_id),
register_cleanup_function=(fn) -> UseEffectCleanups.register_cleanup(fn, cell_id),
)

did_mention_expansion_time = false
cell_expanded_exprs[cell_uuid] = CachedMacroExpansion(
cell_expanded_exprs[cell_id] = CachedMacroExpansion(
expr_hash(expr),
expr_to_save,
elapsed_ns,
Expand Down Expand Up @@ -525,7 +525,7 @@ function run_expression(
# .... But ideally we wouldn't re-macroexpand and store the error the first time (TODO-ish)
if !haskey(cell_expanded_exprs, cell_id) || cell_expanded_exprs[cell_id].original_expr_hash != expr_hash(expr)
try
try_macroexpand(m, cell_id, expr)
try_macroexpand(m, notebook_id, cell_id, expr)
catch e
result = CapturedException(e, stacktrace(catch_backtrace()))
cell_results[cell_id], cell_runtimes[cell_id] = (result, nothing)
Expand Down

0 comments on commit b9158e0

Please sign in to comment.