Skip to content

Commit

Permalink
watcher/darwin: put back atomics
Browse files Browse the repository at this point in the history
  • Loading branch information
Will committed Nov 26, 2023
1 parent b8d39d9 commit fd6feb1
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 176 deletions.
62 changes: 39 additions & 23 deletions devel/include/detail/wtr/watcher/adapter/darwin/watch.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <limits>
#include <random>
#include <string>
#include <thread>
#include <tuple>
#include <unistd.h>
#include <unordered_set>
Expand Down Expand Up @@ -239,11 +240,11 @@ static_assert(FSEventStreamCallback{event_recv} == event_recv);

inline auto open_event_stream(
std::filesystem::path const& path,
::wtr::watcher::event::callback const& callback,
dispatch_queue_t queue) noexcept
::wtr::watcher::event::callback const& callback) noexcept
-> std::tuple<bool, std::shared_ptr<sysres_type>>
{
using namespace std::chrono_literals;
using std::chrono::duration_cast, std::chrono::seconds;

auto sysres =
std::make_shared<sysres_type>(sysres_type{nullptr, argptr_type{callback}});
Expand Down Expand Up @@ -293,30 +294,38 @@ inline auto open_event_stream(
kernel. The event stream will call `event_recv` with
`context` and some details about each filesystem event
the kernel sees for the paths in `path_array`. */
FSEventStreamRef stream = FSEventStreamCreate(
/* A custom allocator is optional */
nullptr,
/* A callable to invoke on changes */
&event_recv,
/* The callable's arguments (context) */
&context,
/* The path(s) we were asked to watch */
path_array,
/* The time "since when" we receive events */
kFSEventStreamEventIdSinceNow,
/* The time between scans *after inactivity* */
(0.016s).count(),
/* The event stream flags */
fsev_flag_listen);

if (stream && queue) {
FSEventStreamSetDispatchQueue(stream, queue);
if (
FSEventStreamRef stream = FSEventStreamCreate(
/* A custom allocator is optional */
nullptr,
/* A callable to invoke on changes */
&event_recv,
/* The callable's arguments (context) */
&context,
/* The path(s) we were asked to watch */
path_array,
/* The time "since when" we receive events */
kFSEventStreamEventIdSinceNow,
/* The time between scans *after inactivity* */
(0.016s).count(),
/* The event stream flags */
fsev_flag_listen)) {
FSEventStreamSetDispatchQueue(
stream,
/* We don't need to retain, maintain or release this
dispatch queue. It's a global system queue, and it
outlives us. */
dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0));

FSEventStreamStart(stream);

sysres->stream = stream;

/* todo: Do we need to release these?
CFRelease(path_cfstring);
CFRelease(path_array);
*/

return {true, sysres};
}
else {
Expand Down Expand Up @@ -349,16 +358,23 @@ close_event_stream(std::shared_ptr<sysres_type> const& sysres) noexcept -> bool
return false;
}

inline auto block_while(semabin const& is_living)
{
using namespace std::chrono_literals;
using std::this_thread::sleep_for;

while (is_living.state() == semabin::state::pending) sleep_for(16ms);
}

} /* namespace */

inline auto watch(
std::filesystem::path const& path,
::wtr::watcher::event::callback const& callback,
semabin const& is_living) noexcept -> bool
{
auto queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
auto&& [ok, sysres] = open_event_stream(path, callback, queue);
return ok && is_living.block_on(queue) && close_event_stream(sysres);
auto&& [ok, sysres] = open_event_stream(path, callback);
return ok ? (block_while(is_living), close_event_stream(sysres)) : false;
}

} /* namespace adapter */
Expand Down
72 changes: 7 additions & 65 deletions devel/include/detail/wtr/watcher/semabin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,17 @@
#include <string.h>
#include <sys/eventfd.h>
#include <unistd.h>
#elif defined(__APPLE__)
#include <dispatch/dispatch.h>
#endif

namespace detail::wtr::watcher {

/* A semaphore-like construct which can be
used with poll and friends on Linux and
dispatch on Apple. This class is emulates
a binary semaphore; An atomic boolean flag.
On other platforms, this is an atomic flag
which can be checked in a sleep, wake loop,
ideally with a generous sleep time.
/* A semaphore which is pollable on Linux.
On all platforms, this behaves like an
atomic boolean flag. (On non-Linux, this
literally is an atomic boolean flag.)
On Linux, this is an eventfd in semaphore
mode. The file descriptor is exposed for
use with poll and friends.
On macOS, this is a dispatch_semaphore_t,
which can be scheduled with dispatch.
*/
mode, which means that it can be waited on
with poll() and friends. */

class semabin {
public:
Expand All @@ -39,7 +28,7 @@ class semabin {
mutable std::atomic<state> is = pending;

public:
#if defined(__linux__)
#ifdef __linux__

int const fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE);

Expand Down Expand Up @@ -80,53 +69,6 @@ class semabin {

inline ~semabin() noexcept { close(this->fd); }

#elif defined(__APPLE__)

dispatch_semaphore_t const sema = []()
{ return dispatch_semaphore_create(0); }();

inline auto release() noexcept -> state
{
auto write_ev = [this]()
{
if (dispatch_semaphore_signal(this->sema) == 1)
return released;
else
return error;
};

if (this->is == released)
return released;
else
return this->is = write_ev();
}

inline auto state() const noexcept -> state
{
auto read_ev = [this]()
{
if (dispatch_semaphore_wait(this->sema, DISPATCH_TIME_NOW) == 0)
return released;
else
return pending;
};

if (this->is == pending)
return pending;
else
return this->is = read_ev();
}

inline auto block_on(dispatch_queue_t queue) const noexcept -> bool
{
dispatch_sync(queue, ^{
dispatch_semaphore_wait(this->sema, DISPATCH_TIME_FOREVER);
});
return this->state() == released;
}

inline ~semabin() noexcept { dispatch_release(this->sema); }

#else

inline auto release() noexcept -> enum state { return this->is = released; }
Expand Down
Loading

0 comments on commit fd6feb1

Please sign in to comment.