Skip to content

Commit

Permalink
Merge pull request #113 from jhawthorn/periodic_thread
Browse files Browse the repository at this point in the history
Use PeriodicThread for sampling
  • Loading branch information
jhawthorn authored Dec 11, 2024
2 parents 243c9d4 + f8b7793 commit abfdeed
Show file tree
Hide file tree
Showing 8 changed files with 291 additions and 192 deletions.
1 change: 1 addition & 0 deletions ext/vernier/extconf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@
have_func("rb_profile_thread_frames", "ruby/debug.h")

have_func("pthread_setname_np")
have_func("pthread_condattr_setclock")

create_makefile("vernier/vernier")
61 changes: 4 additions & 57 deletions ext/vernier/memory.cc
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#include <atomic>
#include <mutex>
#include <stdio.h>
#include <unistd.h>
#include <vector>

#include "vernier.hh"
#include "timestamp.hh"
#include "periodic_thread.hh"

#if defined(__APPLE__)

Expand Down Expand Up @@ -60,62 +60,6 @@ static VALUE rb_memory_rss(VALUE self) {
return ULL2NUM(memory_rss());
}

class PeriodicThread {
std::atomic<bool> running;
pthread_t pthread;
TimeStamp interval;

public:
PeriodicThread() : interval(TimeStamp::from_milliseconds(10)) {
}

void set_interval(TimeStamp timestamp) {
interval = timestamp;
}

static void *thread_entrypoint(void *arg) {
static_cast<PeriodicThread *>(arg)->run();
return NULL;
}

void run() {
TimeStamp next_sample_schedule = TimeStamp::Now();
while (running) {
TimeStamp sample_complete = TimeStamp::Now();

run_iteration();

next_sample_schedule += interval;

if (next_sample_schedule < sample_complete) {
next_sample_schedule = sample_complete + interval;
}

TimeStamp::SleepUntil(next_sample_schedule);
}
}

virtual void run_iteration() = 0;

void start() {
if (running) return;

running = true;

int ret = pthread_create(&pthread, NULL, &thread_entrypoint, this);
if (ret != 0) {
perror("pthread_create");
rb_bug("VERNIER: pthread_create failed");
}
}

void stop() {
if (!running) return;

running = false;
}
};

class MemoryTracker : public PeriodicThread {
public:
struct Record {
Expand All @@ -125,6 +69,9 @@ class MemoryTracker : public PeriodicThread {
std::vector<Record> results;
std::mutex mutex;

MemoryTracker() : PeriodicThread(TimeStamp::from_milliseconds(10)) {
}

void run_iteration() {
record();
}
Expand Down
141 changes: 141 additions & 0 deletions ext/vernier/periodic_thread.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
#include "ruby.h"

#include <atomic>
#include "timestamp.hh"

#ifdef __APPLE__

#include <mach/mach.h>
#include <mach/mach_time.h>
#include <pthread.h>

// https://developer.apple.com/library/archive/technotes/tn2169/_index.html
inline void upgrade_thread_priority(pthread_t pthread) {
mach_timebase_info_data_t timebase_info;
mach_timebase_info(&timebase_info);

const uint64_t NANOS_PER_MSEC = 1000000ULL;
double clock2abs = ((double)timebase_info.denom / (double)timebase_info.numer) * NANOS_PER_MSEC;

thread_time_constraint_policy_data_t policy;
policy.period = 0;

// FIXME: I really don't know what these value should be
policy.computation = (uint32_t)(5 * clock2abs); // 5 ms of work
policy.constraint = (uint32_t)(10 * clock2abs);
policy.preemptible = FALSE;

int kr = thread_policy_set(pthread_mach_thread_np(pthread_self()),
THREAD_TIME_CONSTRAINT_POLICY,
(thread_policy_t)&policy,
THREAD_TIME_CONSTRAINT_POLICY_COUNT);

if (kr != KERN_SUCCESS) {
mach_error("thread_policy_set:", kr);
exit(1);
}
}
#else
inline void upgrade_thread_priority(pthread_t pthread) {
}
#endif

class PeriodicThread {
pthread_t pthread;
TimeStamp interval;

pthread_mutex_t running_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t running_cv;
std::atomic_bool running;

public:
PeriodicThread(TimeStamp interval) : interval(interval), running(false) {
pthread_condattr_t attr;
pthread_condattr_init(&attr);
#if HAVE_PTHREAD_CONDATTR_SETCLOCK
pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
#endif
pthread_cond_init(&running_cv, &attr);
}

void set_interval(TimeStamp timestamp) {
interval = timestamp;
}

static void *thread_entrypoint(void *arg) {
upgrade_thread_priority(pthread_self());

static_cast<PeriodicThread *>(arg)->run();
return NULL;
}

void run() {
#if HAVE_PTHREAD_SETNAME_NP
#ifdef __APPLE__
pthread_setname_np("Vernier profiler");
#else
pthread_setname_np(pthread_self(), "Vernier profiler");
#endif
#endif

TimeStamp next_sample_schedule = TimeStamp::Now();
bool done = false;
while (!done) {
TimeStamp sample_complete = TimeStamp::Now();

run_iteration();

next_sample_schedule += interval;

if (next_sample_schedule < sample_complete) {
next_sample_schedule = sample_complete + interval;
}

pthread_mutex_lock(&running_mutex);
if (running) {
#if HAVE_PTHREAD_CONDATTR_SETCLOCK
struct timespec next_sample_ts = next_sample_schedule.timespec();
#else
auto offset = TimeStamp::NowRealtime() - TimeStamp::Now();
struct timespec next_sample_ts = (next_sample_schedule + offset).timespec();
#endif
int ret;
do {
ret = pthread_cond_timedwait(&running_cv, &running_mutex, &next_sample_ts);
} while(running && ret == EINTR);
}
done = !running;
pthread_mutex_unlock(&running_mutex);
}
}

virtual void run_iteration() = 0;

void start() {
pthread_mutex_lock(&running_mutex);
if (!running) {
running = true;

int ret = pthread_create(&pthread, NULL, &thread_entrypoint, this);
if (ret != 0) {
perror("pthread_create");
rb_bug("VERNIER: pthread_create failed");
}
}
pthread_mutex_unlock(&running_mutex);
}

void stop() {
pthread_mutex_lock(&running_mutex);
bool was_running = running;
if (running) {
running = false;
pthread_cond_broadcast(&running_cv);
}
pthread_mutex_unlock(&running_mutex);
if (was_running)
pthread_join(pthread, NULL);
pthread = 0;
}
};

72 changes: 72 additions & 0 deletions ext/vernier/signal_safe_semaphore.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#ifndef SIGNAL_SAFE_SEMAPHORE_HH
#define SIGNAL_SAFE_SEMAPHORE_HH

#if defined(__APPLE__)
/* macOS */
#include <dispatch/dispatch.h>
#elif defined(__FreeBSD__)
/* FreeBSD */
#include <pthread_np.h>
#include <semaphore.h>
#else
/* Linux */
#include <semaphore.h>
#include <sys/syscall.h> /* for SYS_gettid */
#endif

// A basic semaphore built on sem_wait/sem_post
// post() is guaranteed to be async-signal-safe
class SignalSafeSemaphore {
#ifdef __APPLE__
dispatch_semaphore_t sem;
#else
sem_t sem;
#endif

public:

SignalSafeSemaphore(unsigned int value = 0) {
#ifdef __APPLE__
sem = dispatch_semaphore_create(value);
#else
sem_init(&sem, 0, value);
#endif
};

~SignalSafeSemaphore() {
#ifdef __APPLE__
dispatch_release(sem);
#else
sem_destroy(&sem);
#endif
};

void wait() {
#ifdef __APPLE__
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
#else
// Use sem_timedwait so that we get a crash instead of a deadlock for
// easier debugging
struct timespec ts = (TimeStamp::NowRealtime() + TimeStamp::from_seconds(5)).timespec();

int ret;
do {
ret = sem_timedwait(&sem, &ts);
} while (ret && errno == EINTR);
if (ret != 0) {
rb_bug("VERNIER: sem_timedwait waited over 5 seconds");
}
assert(ret == 0);
#endif
}

void post() {
#ifdef __APPLE__
dispatch_semaphore_signal(sem);
#else
sem_post(&sem);
#endif
}
};

#endif
6 changes: 6 additions & 0 deletions ext/vernier/timestamp.hh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ class TimeStamp {
return TimeStamp(ts.tv_sec * nanoseconds_per_second + ts.tv_nsec);
}

static TimeStamp NowRealtime() {
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return TimeStamp(ts.tv_sec * nanoseconds_per_second + ts.tv_nsec);
}

static TimeStamp Zero() {
return TimeStamp(0);
}
Expand Down
Loading

0 comments on commit abfdeed

Please sign in to comment.