Skip to content

Commit

Permalink
watcher/linux: remove periodic polling
Browse files Browse the repository at this point in the history
  • Loading branch information
Will committed Nov 25, 2023
1 parent 20aa921 commit 3b7356b
Show file tree
Hide file tree
Showing 10 changed files with 336 additions and 126 deletions.
7 changes: 3 additions & 4 deletions devel/include/detail/wtr/watcher/adapter/darwin/watch.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#if defined(__APPLE__)

#include "wtr/watcher.hpp"
#include <atomic>
#include <chrono>
#include <CoreFoundation/CoreFoundation.h>
#include <CoreServices/CoreServices.h>
Expand Down Expand Up @@ -359,20 +358,20 @@ close_event_stream(std::shared_ptr<sysres_type> const& sysres) noexcept -> bool
return false;
}

inline auto block_while(std::atomic<bool>& b)
inline auto block_while(semabin const& is_living)
{
using namespace std::chrono_literals;
using std::this_thread::sleep_for;

while (b) sleep_for(16ms);
while (is_living.state() == semabin::state::pending) sleep_for(16ms);
}

} /* namespace */

inline auto watch(
std::filesystem::path const& path,
::wtr::watcher::event::callback const& callback,
std::atomic<bool>& is_living) noexcept -> bool
semabin const& is_living) noexcept -> bool
{
auto&& [ok, sysres] = open_event_stream(path, callback);
return ok ? (block_while(is_living), close_event_stream(sysres)) : false;
Expand Down
44 changes: 27 additions & 17 deletions devel/include/detail/wtr/watcher/adapter/linux/fanotify/watch.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ struct ke_fa_ev {
struct sysres {
bool ok = false;
ke_fa_ev ke{};
semabin const& il{};
adapter::ep ep{};
};

Expand All @@ -90,19 +91,23 @@ inline auto do_mark =
sends diagnostics on warnings and errors.
Walks the given base path, recursively,
marking each directory along the way. */
inline auto make_sysres =
[](char const* const base_path, auto const& cb) -> sysres
inline auto make_sysres = [](
char const* const base_path,
auto const& cb,
semabin const& is_living) -> sysres
{
auto do_error = [&](auto&& msg)
{ return (adapter::do_error(std::move(msg), base_path, cb), sysres{}); };
int fa_fd = fanotify_init(ke_fa_ev::init_flags, ke_fa_ev::init_io_flags);
if (fa_fd < 1) {
do_error("e/sys/fanotify_init@", base_path, cb);
return {.ok = false};
}
if (fa_fd < 1) return do_error("e/sys/fanotify_init@");
walkdir_do(base_path, [&](auto dir) { do_mark(dir, fa_fd, cb); });
auto ep = make_ep(base_path, cb, is_living.fd, fa_fd);
if (ep.fd < 1) return (close(fa_fd), do_error("e/self/resource@"));
return {
.ok = true,
.ke{.fd = fa_fd},
.ep = make_ep(base_path, cb, fa_fd),
.il = is_living,
.ep = ep,
};
};

Expand Down Expand Up @@ -297,26 +302,31 @@ inline auto do_ev_recv =
};

inline auto watch =
[](char const* const path, auto const& cb, auto const& is_living) -> bool
[](char const* const path, auto const& cb, semabin const& is_living) -> bool
{
auto sr = make_sysres(path, cb);
auto sr = make_sysres(path, cb, is_living);
auto do_error = [&](auto&& msg) -> bool
{ return (close_sysres(sr), adapter::do_error(msg, path, cb)); };
auto is_ev_of = [&](int nth, int fd) -> bool
{ return sr.ep.interests[nth].data.fd == fd; };

if (! sr.ok) return do_error("e/self/resource@");
while (is_living) [[likely]] {
while (true) {
int ep_c =
epoll_wait(sr.ep.fd, sr.ep.interests, sr.ep.q_ulim, sr.ep.wake_ms);
if (ep_c < 0)
return do_error("e/sys/epoll_wait@");
else if (ep_c > 0) [[likely]]
for (int n = 0; n < ep_c; n++)
if (sr.ep.interests[n].data.fd == sr.ke.fd) [[likely]]
if (! do_ev_recv(path, cb, sr)) [[unlikely]]
return do_error("e/self/ev_recv@");
else if (ep_c == 0)
continue;
else
for (int n = 0; n < ep_c; ++n)
if (is_ev_of(n, sr.il.fd))
return sr.il.state() == semabin::state::released
? close_sysres(sr)
: do_error("e/self/semabin@");
else if (is_ev_of(n, sr.ke.fd) && ! do_ev_recv(path, cb, sr))
return do_error("e/self/ev_recv@");
}

return close_sysres(sr);
};

} /* namespace detail::wtr::watcher::adapter::fanotify */
Expand Down
34 changes: 22 additions & 12 deletions devel/include/detail/wtr/watcher/adapter/linux/inotify/watch.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ struct ke_in_ev {
struct sysres {
bool ok = false;
ke_in_ev ke{};
semabin const& il{};
adapter::ep ep{};
};

Expand All @@ -114,8 +115,10 @@ inline auto do_mark =
return do_error("w/sys/not_watched@", dirpath, cb);
};

inline auto make_sysres =
[](char const* const base_path, auto const& cb) -> sysres
inline auto make_sysres = [](
char const* const base_path,
auto const& cb,
semabin const& is_living) -> sysres
{
auto do_error = [&](auto&& msg)
{ return (adapter::do_error(std::move(msg), base_path, cb), sysres{}); };
Expand All @@ -126,7 +129,7 @@ inline auto make_sysres =
walkdir_do(
base_path,
[&](char const* const dir) { do_mark(dir, in_fd, dm, cb); });
auto ep = make_ep(base_path, cb, in_fd);
auto ep = make_ep(base_path, cb, is_living.fd, in_fd);
if (dm.empty() || ep.fd < 0)
return (close(in_fd), do_error("e/self/resource@"));
return sysres{
Expand All @@ -135,6 +138,7 @@ inline auto make_sysres =
.fd = in_fd,
.dm = std::move(dm),
},
.il = is_living,
.ep = ep,
};
};
Expand Down Expand Up @@ -325,25 +329,31 @@ inline auto do_ev_recv =
};

inline auto watch =
[](char const* const path, auto const& cb, auto const& is_living) -> bool
[](char const* const path, auto const& cb, semabin const& is_living) -> bool
{
auto sr = make_sysres(path, cb);
auto sr = make_sysres(path, cb, is_living);
auto do_error = [&](auto&& msg) -> bool
{ return (close_sysres(sr), adapter::do_error(msg, path, cb)); };
auto is_ev_of = [&](int nth, int fd) -> bool
{ return sr.ep.interests[nth].data.fd == fd; };

if (! sr.ok) return do_error("e/self/resource@");
while (is_living) {
while (true) {
int ep_c =
epoll_wait(sr.ep.fd, sr.ep.interests, sr.ep.q_ulim, sr.ep.wake_ms);
if (ep_c < 0)
return do_error("e/sys/epoll_wait@");
else if (ep_c > 0) [[likely]]
for (int n = 0; n < ep_c; n++)
if (sr.ep.interests[n].data.fd == sr.ke.fd) [[likely]]
if (! do_ev_recv(path, cb, sr)) [[unlikely]]
return do_error("e/self/ev_recv@");
else if (ep_c == 0)
continue;
else
for (int n = 0; n < ep_c; ++n)
if (is_ev_of(n, sr.il.fd))
return sr.il.state() == semabin::state::released
? close_sysres(sr)
: do_error("e/self/semabin@");
else if (is_ev_of(n, sr.ke.fd) && ! do_ev_recv(path, cb, sr))
return do_error("e/self/ev_recv@");
}
return close_sysres(sr);
};

} /* namespace detail::wtr::watcher::adapter::inotify */
Expand Down
27 changes: 16 additions & 11 deletions devel/include/detail/wtr/watcher/adapter/linux/sysres.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ struct ep {
static constexpr auto q_ulim = 64;
/* The delay, in milliseconds, while
`epoll_wait` will 'pause' us for
until we are woken up. We use the
time after this to check if we're
still alive, then re-enter epoll.
until we are woken up. We check if
we are still alive through the fd
from our semaphore-like eventfd.
*/
static constexpr auto wake_ms = 16;
static constexpr auto wake_ms = -1;

int fd = -1;
epoll_event interests[q_ulim]{};
Expand All @@ -55,8 +55,11 @@ inline auto do_warn =
[](std::string&& msg, auto const& path, auto const& cb) -> bool
{ return (do_error(std::move(msg), path, cb), true); };

inline auto make_ep =
[](char const* const base_path, auto const& cb, int ev_fd) -> ep
inline auto make_ep = [](
char const* const base_path,
auto const& cb,
int ev_il_fd,
int ev_fs_fd) -> ep
{
auto do_error = [&](auto&& msg)
{ return (adapter::do_error(msg, base_path, cb), ep{}); };
Expand All @@ -65,11 +68,13 @@ inline auto make_ep =
#else
int fd = epoll_create1(EPOLL_CLOEXEC);
#endif
auto want_ev = epoll_event{.events = EPOLLIN, .data{.fd = ev_fd}};
int ec = epoll_ctl(fd, EPOLL_CTL_ADD, ev_fd, &want_ev);
return fd < 0 ? do_error("e/sys/epoll_create@")
: ec < 0 ? (close(fd), do_error("e/sys/epoll_ctl@"))
: ep{.fd = fd};
auto want_ev_fs = epoll_event{.events = EPOLLIN, .data{.fd = ev_fs_fd}};
auto want_ev_il = epoll_event{.events = EPOLLIN, .data{.fd = ev_il_fd}};
bool ctl_ok = epoll_ctl(fd, EPOLL_CTL_ADD, ev_fs_fd, &want_ev_fs) >= 0
&& epoll_ctl(fd, EPOLL_CTL_ADD, ev_il_fd, &want_ev_il) >= 0;
return fd < 0 ? do_error("e/sys/epoll_create@")
: ! ctl_ok ? (close(fd), do_error("e/sys/epoll_ctl@"))
: ep{.fd = fd};
};

inline auto is_dir(char const* const path) -> bool
Expand Down
4 changes: 2 additions & 2 deletions devel/include/detail/wtr/watcher/adapter/warthog/watch.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ inline bool tend_bucket(
inline auto watch(
std::filesystem::path const& path,
::wtr::watcher::event::callback const& callback,
std::atomic<bool>& is_living) noexcept -> bool
semabin const& is_living) noexcept -> bool
{
using std::this_thread::sleep_for, std::chrono::milliseconds;

Expand All @@ -224,7 +224,7 @@ inline auto watch(

static constexpr auto delay_ms = 16;

while (is_living) {
while (is_living.state() == semabin::state::pending) {
if (
! tend_bucket(path, callback, bucket) || ! scan(path, callback, bucket)) {
return false;
Expand Down
7 changes: 3 additions & 4 deletions devel/include/detail/wtr/watcher/adapter/windows/watch.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#if defined(_WIN32)

#include "wtr/watcher.hpp"
#include <atomic>
#include <chrono>
#include <filesystem>
#include <string>
Expand Down Expand Up @@ -191,7 +190,7 @@ inline auto do_event_send(
else {
return false;
}
} // namespace
}

} // namespace

Expand All @@ -202,7 +201,7 @@ inline auto do_event_send(
inline auto watch(
std::filesystem::path const& path,
::wtr::watcher::event::callback const& callback,
std::atomic<bool>& is_living) noexcept -> bool
semabin const& is_living) noexcept -> bool
{
using namespace ::wtr::watcher;

Expand All @@ -213,7 +212,7 @@ inline auto watch(

while (is_valid(w) && has_event(w)) { do_event_send(w, callback); }

while (is_living) {
while (is_living.state() == semabin::state::pending) {
ULONG_PTR completion_key{0};
LPOVERLAPPED overlap{nullptr};

Expand Down
81 changes: 81 additions & 0 deletions devel/include/detail/wtr/watcher/semabin.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#pragma once

#include <atomic>

#ifdef __linux__
#include <errno.h>
#include <stdint.h>
#include <string.h>
#include <sys/eventfd.h>
#include <unistd.h>
#endif

namespace detail::wtr::watcher {

/* A semaphore which is pollable on Linux.
On all platforms, this behaves like an
atomic boolean flag. (On non-Linux, this
literally is an atomic boolean flag.)
On Linux, this is an eventfd in semaphore
mode, which means that it can be waited on
with poll() and friends. */

class semabin {
public:
enum state { pending, released, error };

private:
mutable std::atomic<state> is = pending;

public:
#ifdef __linux__

int const fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE);

inline auto release() noexcept -> state
{
auto write_ev = [this]()
{
if (eventfd_write(this->fd, 1) == 0)
return released;
else
return error;
};

if (this->is == released)
return released;
else
return this->is = write_ev();
}

inline auto state() const noexcept -> state
{
auto read_ev = [this]()
{
uint64_t _ = 0;
if (eventfd_read(this->fd, &_) == 0)
return released;
else if (errno == EAGAIN)
return pending;
else
return error;
};

if (this->is == pending)
return pending;
else
return this->is = read_ev();
}

inline ~semabin() noexcept { close(this->fd); }

#else

inline auto release() noexcept -> enum state { return this->is = released; }

inline auto state() const noexcept -> enum state { return this->is; }

#endif
};

} /* namespace detail::wtr::watcher */
Loading

0 comments on commit 3b7356b

Please sign in to comment.