Skip to content
This repository has been archived by the owner on Jun 5, 2024. It is now read-only.

Commit

Permalink
Integrate tracing into runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
filiptibell committed Jan 27, 2024
1 parent fa4e673 commit 9941205
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 21 deletions.
124 changes: 124 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ async-executor = "1.8"
concurrent-queue = "2.4"
event-listener = "4.0"
futures-lite = "2.2"
tracing = "0.1"

mlua = { version = "0.9.5", features = [
"luau",
Expand All @@ -25,6 +26,7 @@ mlua = { version = "0.9.5", features = [
[dev-dependencies]
async-fs = "2.1"
async-io = "2.3"
tracing-subscriber = "0.3"

[lints.clippy]
all = { level = "deny", priority = -3 }
Expand Down
2 changes: 2 additions & 0 deletions examples/basic_sleep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use mlua_luau_runtime::Runtime;
const MAIN_SCRIPT: &str = include_str!("./lua/basic_sleep.luau");

pub fn main() -> LuaResult<()> {
tracing_subscriber::fmt::init();

// Set up persistent Lua environment
let lua = Lua::new();
lua.globals().set(
Expand Down
2 changes: 2 additions & 0 deletions examples/basic_spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use mlua_luau_runtime::{LuaSpawnExt, Runtime};
const MAIN_SCRIPT: &str = include_str!("./lua/basic_spawn.luau");

pub fn main() -> LuaResult<()> {
tracing_subscriber::fmt::init();

// Set up persistent Lua environment
let lua = Lua::new();
lua.globals().set(
Expand Down
2 changes: 2 additions & 0 deletions examples/callbacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use async_io::block_on;
const MAIN_SCRIPT: &str = include_str!("./lua/callbacks.luau");

pub fn main() -> LuaResult<()> {
tracing_subscriber::fmt::init();

// Set up persistent Lua environment
let lua = Lua::new();

Expand Down
2 changes: 2 additions & 0 deletions examples/lots_of_threads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ const MAIN_SCRIPT: &str = include_str!("./lua/lots_of_threads.luau");
const ONE_NANOSECOND: Duration = Duration::from_nanos(1);

pub fn main() -> LuaResult<()> {
tracing_subscriber::fmt::init();

// Set up persistent Lua environment
let lua = Lua::new();
let rt = Runtime::new(&lua);
Expand Down
2 changes: 2 additions & 0 deletions examples/scheduler_ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use mlua_luau_runtime::Runtime;
const MAIN_SCRIPT: &str = include_str!("./lua/scheduler_ordering.luau");

pub fn main() -> LuaResult<()> {
tracing_subscriber::fmt::init();

// Set up persistent Lua environment
let lua = Lua::new();
let rt = Runtime::new(&lua);
Expand Down
2 changes: 2 additions & 0 deletions lib/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ impl ThreadQueue {
) -> LuaResult<()> {
let thread = thread.into_lua_thread(lua)?;
let args = args.into_lua_multi(lua)?;

tracing::trace!("pushing item to queue with {} args", args.len());
let stored = ThreadWithArgs::new(lua, thread, args)?;

self.queue.push(stored).into_lua_err()?;
Expand Down
74 changes: 53 additions & 21 deletions lib/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use futures_lite::prelude::*;
use mlua::prelude::*;

use async_executor::{Executor, LocalExecutor};
use tracing::Instrument;

use super::{
error_callback::ThreadErrorCallback, queue::ThreadQueue, traits::IntoLuaThread,
Expand Down Expand Up @@ -71,6 +72,7 @@ impl<'lua> Runtime<'lua> {
thread: impl IntoLuaThread<'lua>,
args: impl IntoLuaMulti<'lua>,
) -> LuaResult<()> {
tracing::debug!(deferred = false, "new runtime thread");
self.queue_spawn.push_item(self.lua, thread, args)
}

Expand All @@ -90,6 +92,7 @@ impl<'lua> Runtime<'lua> {
thread: impl IntoLuaThread<'lua>,
args: impl IntoLuaMulti<'lua>,
) -> LuaResult<()> {
tracing::debug!(deferred = true, "new runtime thread");
self.queue_defer.push_item(self.lua, thread, args)
}

Expand Down Expand Up @@ -207,38 +210,65 @@ impl<'lua> Runtime<'lua> {
when there are new Lua threads to enqueue and potentially more work to be done.
*/
let fut = async {
let process_thread = |thread: LuaThread<'lua>, args| {
// NOTE: Thread may have been cancelled from Lua
// before we got here, so we need to check it again
if thread.status() == LuaThreadStatus::Resumable {
let mut stream = thread.clone().into_async::<_, LuaValue>(args);
lua_exec
.spawn(async move {
// Only run stream until first coroutine.yield or completion. We will
// drop it right away to clear stack space since detached tasks dont drop
// until the executor drops (https://github.com/smol-rs/smol/issues/294)
let res = stream.next().await.unwrap();
if let Err(e) = &res {
self.error_callback.call(e);
}
})
.detach();
}
};

loop {
let fut_spawn = self.queue_spawn.wait_for_item(); // 1
let fut_defer = self.queue_defer.wait_for_item(); // 2
let fut_tick = lua_exec.tick(); // 3

fut_spawn.or(fut_defer).or(fut_tick).await;

let process_thread = |thread: LuaThread<'lua>, args| {
// NOTE: Thread may have been cancelled from Lua
// before we got here, so we need to check it again
if thread.status() == LuaThreadStatus::Resumable {
let mut stream = thread.clone().into_async::<_, LuaValue>(args);
lua_exec
.spawn(async move {
// Only run stream until first coroutine.yield or completion. We will
// drop it right away to clear stack space since detached tasks dont drop
// until the executor drops (https://github.com/smol-rs/smol/issues/294)
let res = stream.next().await.unwrap();
if let Err(e) = &res {
self.error_callback.call(e);
}
})
.detach();

// 3
let mut num_processed = 0;
let span_tick = tracing::debug_span!("tick_executor");
let fut_tick = async {
lua_exec.tick().await;
// NOTE: Try to do as much work as possible instead of just a single tick()
num_processed += 1;
while lua_exec.try_tick() {
num_processed += 1;
}
};

// 1 + 2 + 3
fut_spawn
.or(fut_defer)
.or(fut_tick.instrument(span_tick.or_current()))
.await;

// Emit traces
if num_processed > 0 {
tracing::trace!(num_processed, "tasks_processed");
}

// Process spawned threads first, then deferred threads
let mut num_spawned = 0;
let mut num_deferred = 0;
for (thread, args) in self.queue_spawn.drain_items(self.lua) {
process_thread(thread, args);
num_spawned += 1;
}
for (thread, args) in self.queue_defer.drain_items(self.lua) {
process_thread(thread, args);
num_deferred += 1;
}
if num_spawned > 0 || num_deferred > 0 {
tracing::trace!(num_spawned, num_deferred, "tasks_spawned");
}

// Empty executor = we didn't spawn any new Lua tasks
Expand All @@ -249,7 +279,9 @@ impl<'lua> Runtime<'lua> {
}
};

main_exec.run(fut).await;
// Run the executor inside a span until all lua threads complete
let span = tracing::debug_span!("run_executor");
main_exec.run(fut).instrument(span.or_current()).await;

// Clean up
self.lua.remove_app_data::<Weak<Executor>>();
Expand Down
Loading

0 comments on commit 9941205

Please sign in to comment.