Skip to content

Commit

Permalink
EventLoop: yield fibers internally [fixup #14996] (#15215)
Browse files Browse the repository at this point in the history
Refactors the internals of the epoll/kqueue event loop to `yield` the fiber(s) to be resumed instead of blindly calling `Crystal::Scheduler.enqueue`, so the `#run` method becomes the one place responsible to enqueue the fibers.

The current behavior doesn't change, the `#run` method still enqueues the fiber immediately, but it can now be changed in a single place.

For example the [execution context shard](https://github.com/ysbaddaden/execution_context) monkey-patches an alternative `#run` method that collects and returns fibers to avoid parallel enqueues from an evloop run to interrupt the evloop run (:sob:).

Note that the `#close` method still directly enqueues waiting fibers one by one, for now.
  • Loading branch information
ysbaddaden authored Nov 23, 2024
1 parent c2d1a01 commit fe90f27
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 31 deletions.
2 changes: 1 addition & 1 deletion spec/std/crystal/evented/poll_descriptor_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ require "spec"
class Crystal::Evented::FakeLoop < Crystal::Evented::EventLoop
getter operations = [] of {Symbol, Int32, Crystal::Evented::Arena::Index | Bool}

private def system_run(blocking : Bool) : Nil
private def system_run(blocking : Bool, & : Fiber ->) : Nil
end

private def interrupt : Nil
Expand Down
20 changes: 10 additions & 10 deletions src/crystal/system/unix/epoll/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class Crystal::Epoll::EventLoop < Crystal::Evented::EventLoop
end
{% end %}

private def system_run(blocking : Bool) : Nil
private def system_run(blocking : Bool, & : Fiber ->) : Nil
Crystal.trace :evloop, "run", blocking: blocking ? 1 : 0

# wait for events (indefinitely when blocking)
Expand All @@ -72,41 +72,41 @@ class Crystal::Epoll::EventLoop < Crystal::Evented::EventLoop
# TODO: panic if epoll_event.value.events != LibC::EPOLLIN (could be EPOLLERR or EPLLHUP)
Crystal.trace :evloop, "interrupted"
@eventfd.read
# OPTIMIZE: only reset interrupted before a blocking wait
@interrupted.clear
when @timerfd.fd
# TODO: panic if epoll_event.value.events != LibC::EPOLLIN (could be EPOLLERR or EPLLHUP)
Crystal.trace :evloop, "timer"
timer_triggered = true
else
process_io(epoll_event)
process_io(epoll_event) { |fiber| yield fiber }
end
end

process_timers(timer_triggered)
# OPTIMIZE: only process timers when timer_triggered (?)
process_timers(timer_triggered) { |fiber| yield fiber }
end

private def process_io(epoll_event : LibC::EpollEvent*) : Nil
private def process_io(epoll_event : LibC::EpollEvent*, &) : Nil
index = Evented::Arena::Index.new(epoll_event.value.data.u64)
events = epoll_event.value.events

Crystal.trace :evloop, "event", fd: index.index, index: index.to_i64, events: events

Evented.arena.get?(index) do |pd|
if (events & (LibC::EPOLLERR | LibC::EPOLLHUP)) != 0
pd.value.@readers.ready_all { |event| unsafe_resume_io(event) }
pd.value.@writers.ready_all { |event| unsafe_resume_io(event) }
pd.value.@readers.ready_all { |event| unsafe_resume_io(event) { |fiber| yield fiber } }
pd.value.@writers.ready_all { |event| unsafe_resume_io(event) { |fiber| yield fiber } }
return
end

if (events & LibC::EPOLLRDHUP) == LibC::EPOLLRDHUP
pd.value.@readers.ready_all { |event| unsafe_resume_io(event) }
pd.value.@readers.ready_all { |event| unsafe_resume_io(event) { |fiber| yield fiber } }
elsif (events & LibC::EPOLLIN) == LibC::EPOLLIN
pd.value.@readers.ready_one { |event| unsafe_resume_io(event) }
pd.value.@readers.ready_one { |event| unsafe_resume_io(event) { |fiber| yield fiber } }
end

if (events & LibC::EPOLLOUT) == LibC::EPOLLOUT
pd.value.@writers.ready_one { |event| unsafe_resume_io(event) }
pd.value.@writers.ready_one { |event| unsafe_resume_io(event) { |fiber| yield fiber } }
end
end
end
Expand Down
26 changes: 16 additions & 10 deletions src/crystal/system/unix/evented/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop

# NOTE: thread unsafe
def run(blocking : Bool) : Bool
system_run(blocking)
system_run(blocking) do |fiber|
Crystal::Scheduler.enqueue(fiber)
end
true
end

Expand Down Expand Up @@ -299,11 +301,15 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop

Evented.arena.free(index) do |pd|
pd.value.@readers.ready_all do |event|
pd.value.@event_loop.try(&.unsafe_resume_io(event))
pd.value.@event_loop.try(&.unsafe_resume_io(event) do |fiber|
Crystal::Scheduler.enqueue(fiber)
end)
end

pd.value.@writers.ready_all do |event|
pd.value.@event_loop.try(&.unsafe_resume_io(event))
pd.value.@event_loop.try(&.unsafe_resume_io(event) do |fiber|
Crystal::Scheduler.enqueue(fiber)
end)
end

pd.value.remove(io.fd)
Expand Down Expand Up @@ -418,15 +424,15 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop
# Thread unsafe: we must hold the poll descriptor waiter lock for the whole
# duration of the dequeue/resume_io otherwise we might conflict with timers
# trying to cancel an IO event.
protected def unsafe_resume_io(event : Evented::Event*) : Bool
protected def unsafe_resume_io(event : Evented::Event*, &) : Bool
# we only partially own the poll descriptor; thanks to the lock we know that
# another thread won't dequeue it, yet it may still be in the timers queue,
# which at worst may be waiting on the lock to be released, so event* can be
# dereferenced safely.

if !event.value.wake_at? || delete_timer(event)
# no timeout or we canceled it: we fully own the event
Crystal::Scheduler.enqueue(event.value.fiber)
yield event.value.fiber
true
else
# failed to cancel the timeout so the timer owns the event (by rule)
Expand All @@ -439,7 +445,7 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop
# Shall be called after processing IO events. IO events with a timeout that
# have succeeded shall already have been removed from `@timers` otherwise the
# fiber could be resumed twice!
private def process_timers(timer_triggered : Bool) : Nil
private def process_timers(timer_triggered : Bool, &) : Nil
# collect ready timers before processing them —this is safe— to avoids a
# deadlock situation when another thread tries to process a ready IO event
# (in poll descriptor waiters) with a timeout (same event* in timers)
Expand All @@ -458,11 +464,11 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop
end

buffer.to_slice[0, size].each do |event|
process_timer(event)
process_timer(event) { |fiber| yield fiber }
end
end

private def process_timer(event : Evented::Event*)
private def process_timer(event : Evented::Event*, &)
# we dequeued the event from timers, and by rule we own it, so event* can
# safely be dereferenced:
fiber = event.value.fiber
Expand Down Expand Up @@ -492,7 +498,7 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop
raise RuntimeError.new("BUG: unexpected event in timers: #{event.value}%s\n")
end

Crystal::Scheduler.enqueue(fiber)
yield fiber
end

# internals: system
Expand All @@ -505,7 +511,7 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop
#
# The `PollDescriptor` of IO events can be retrieved using the *index*
# from the system event's user data.
private abstract def system_run(blocking : Bool) : Nil
private abstract def system_run(blocking : Bool, & : Fiber ->) : Nil

# Add *fd* to the polling system, setting *index* as user data.
protected abstract def system_add(fd : Int32, index : Index) : Nil
Expand Down
21 changes: 11 additions & 10 deletions src/crystal/system/unix/kqueue/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class Crystal::Kqueue::EventLoop < Crystal::Evented::EventLoop
end
{% end %}

private def system_run(blocking : Bool) : Nil
private def system_run(blocking : Bool, & : Fiber ->) : Nil
buffer = uninitialized LibC::Kevent[128]

Crystal.trace :evloop, "run", blocking: blocking ? 1 : 0
Expand All @@ -89,11 +89,12 @@ class Crystal::Kqueue::EventLoop < Crystal::Evented::EventLoop
# nothing special
timer_triggered = true
else
process_io(kevent)
process_io(kevent) { |fiber| yield fiber }
end
end

process_timers(timer_triggered)
# OPTIMIZE: only process timers when timer_triggered (?)
process_timers(timer_triggered) { |fiber| yield fiber }
end

private def process_interrupt?(kevent)
Expand All @@ -114,7 +115,7 @@ class Crystal::Kqueue::EventLoop < Crystal::Evented::EventLoop
false
end

private def process_io(kevent : LibC::Kevent*) : Nil
private def process_io(kevent : LibC::Kevent*, &) : Nil
index =
{% if flag?(:bits64) %}
Evented::Arena::Index.new(kevent.value.udata.address)
Expand All @@ -130,25 +131,25 @@ class Crystal::Kqueue::EventLoop < Crystal::Evented::EventLoop
if (kevent.value.fflags & LibC::EV_EOF) == LibC::EV_EOF
# apparently some systems may report EOF on write with EVFILT_READ instead
# of EVFILT_WRITE, so let's wake all waiters:
pd.value.@readers.ready_all { |event| unsafe_resume_io(event) }
pd.value.@writers.ready_all { |event| unsafe_resume_io(event) }
pd.value.@readers.ready_all { |event| unsafe_resume_io(event) { |fiber| yield fiber } }
pd.value.@writers.ready_all { |event| unsafe_resume_io(event) { |fiber| yield fiber } }
return
end

case kevent.value.filter
when LibC::EVFILT_READ
if (kevent.value.fflags & LibC::EV_ERROR) == LibC::EV_ERROR
# OPTIMIZE: pass errno (kevent.data) through PollDescriptor
pd.value.@readers.ready_all { |event| unsafe_resume_io(event) }
pd.value.@readers.ready_all { |event| unsafe_resume_io(event) { |fiber| yield fiber } }
else
pd.value.@readers.ready_one { |event| unsafe_resume_io(event) }
pd.value.@readers.ready_one { |event| unsafe_resume_io(event) { |fiber| yield fiber } }
end
when LibC::EVFILT_WRITE
if (kevent.value.fflags & LibC::EV_ERROR) == LibC::EV_ERROR
# OPTIMIZE: pass errno (kevent.data) through PollDescriptor
pd.value.@writers.ready_all { |event| unsafe_resume_io(event) }
pd.value.@writers.ready_all { |event| unsafe_resume_io(event) { |fiber| yield fiber } }
else
pd.value.@writers.ready_one { |event| unsafe_resume_io(event) }
pd.value.@writers.ready_one { |event| unsafe_resume_io(event) { |fiber| yield fiber } }
end
end
end
Expand Down

0 comments on commit fe90f27

Please sign in to comment.