From 02d7b589703cce72ae1270bd92ee7126cc47f71f Mon Sep 17 00:00:00 2001 From: John Hawthorn Date: Wed, 6 Dec 2023 16:57:03 -0800 Subject: [PATCH] Profile based on Ruby thread rather than native thread (#46) * Record samples by Ruby thread not native thread Previously would collect samples grouping them my native thread id. This worked relatively well, and was a somewhat arbitrary choice. Ruby 3.3 will have a M-N thread scheduler, which means that a single native thread may run multiple Ruby threads, and threads may migrate between native threads. With this profiles should be much more readable and useful if separated by Ruby thread. Co-authored-by: Aaron Patterson Co-authored-by: John Hawthorn Co-authored-by: Danny <46854315+DanFCo@users.noreply.github.com> --- exe/vernier | 2 + ext/vernier/extconf.rb | 5 + ext/vernier/vernier.cc | 257 +++++++++++++++++++++++++--------- lib/vernier/collector.rb | 4 +- lib/vernier/output/firefox.rb | 10 +- test/test_time_collector.rb | 6 +- 6 files changed, 208 insertions(+), 76 deletions(-) diff --git a/exe/vernier b/exe/vernier index 4b53364..5f0cb74 100755 --- a/exe/vernier +++ b/exe/vernier @@ -35,4 +35,6 @@ end vernier_path = File.expand_path('../lib', __dir__) env['RUBYOPT'] = "-I #{vernier_path} -r vernier/autorun #{ENV['RUBYOPT']}" +ARGV.unshift("gdb", "--args") + Kernel.exec(env, *ARGV) diff --git a/ext/vernier/extconf.rb b/ext/vernier/extconf.rb index b85940f..9d32315 100644 --- a/ext/vernier/extconf.rb +++ b/ext/vernier/extconf.rb @@ -5,4 +5,9 @@ $CXXFLAGS += " -std=c++14 " $CXXFLAGS += " -ggdb3 -Og " +have_header("ruby/thread.h") +have_struct_member("rb_internal_thread_event_data_t", "thread", ["ruby/thread.h"]) + +have_func("rb_profile_thread_frames", "ruby/debug.h") + create_makefile("vernier/vernier") diff --git a/ext/vernier/vernier.cc b/ext/vernier/vernier.cc index bbf3556..0aca7ab 100644 --- a/ext/vernier/vernier.cc +++ b/ext/vernier/vernier.cc @@ -1,3 +1,5 @@ +// vim: expandtab:ts=4:sw=4 + #include #include #include @@ -27,6 +29,9 @@ #include "ruby/debug.h" #include "ruby/thread.h" +#undef assert +#define assert RUBY_ASSERT_ALWAYS + # define PTR2NUM(x) (rb_int2inum((intptr_t)(void *)(x))) // Internal TracePoint events we'll monitor during profiling @@ -53,6 +58,22 @@ static VALUE rb_cVernierResult; static VALUE rb_mVernierMarkerType; static VALUE rb_cVernierCollector; +static const char *gvl_event_name(rb_event_flag_t event) { + switch (event) { + case RUBY_INTERNAL_THREAD_EVENT_STARTED: + return "started"; + case RUBY_INTERNAL_THREAD_EVENT_READY: + return "ready"; + case RUBY_INTERNAL_THREAD_EVENT_RESUMED: + return "resumed"; + case RUBY_INTERNAL_THREAD_EVENT_SUSPENDED: + return "suspended"; + case RUBY_INTERNAL_THREAD_EVENT_EXITED: + return "exited"; + } + return "no-event"; +} + class TimeStamp { static const uint64_t nanoseconds_per_second = 1000000000; uint64_t value_ns; @@ -85,8 +106,16 @@ class TimeStamp { } while (target_time > TimeStamp::Now()); } + static TimeStamp from_seconds(uint64_t s) { + return TimeStamp::from_milliseconds(s * 1000); + } + + static TimeStamp from_milliseconds(uint64_t ms) { + return TimeStamp::from_microseconds(ms * 1000); + } + static TimeStamp from_microseconds(uint64_t us) { - return TimeStamp(us * 1000); + return TimeStamp::from_nanoseconds(us * 1000); } static TimeStamp from_nanoseconds(uint64_t ns) { @@ -266,6 +295,10 @@ class SamplerSemaphore { #ifdef __APPLE__ dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); #else + // Use sem_timedwait so that we get a crash instead of a deadlock for + // easier debugging + auto ts = (TimeStamp::Now() + TimeStamp::from_seconds(5)).timespec(); + int ret; do { ret = sem_wait(&sem); @@ -304,16 +337,15 @@ struct RawSample { } void sample() { + clear(); + if (!ruby_native_thread_p()) { - clear(); return; } if (rb_during_gc()) { gc = true; - len = 0; } else { - gc = false; len = rb_profile_frames(0, MAX_LEN, frames, lines); } } @@ -602,12 +634,13 @@ class Marker { Phase phase; TimeStamp timestamp; TimeStamp finish; - native_thread_id_t thread_id; + // VALUE ruby_thread_id; + //native_thread_id_t thread_id; int stack_index = -1; VALUE to_array() { VALUE record[6] = {0}; - record[0] = ULL2NUM(thread_id); + record[0] = Qnil; // FIXME record[1] = INT2NUM(type); record[2] = INT2NUM(phase); record[3] = ULL2NUM(timestamp.nanoseconds()); @@ -625,30 +658,33 @@ class Marker { }; class MarkerTable { - TimeStamp last_gc_entry; - public: std::vector list; std::mutex mutex; - void record_gc_entered() { - last_gc_entry = TimeStamp::Now(); - } - - void record_gc_leave() { - list.push_back({ Marker::MARKER_GC_PAUSE, Marker::INTERVAL, last_gc_entry, TimeStamp::Now(), get_native_thread_id(), -1 }); - } - void record_interval(Marker::Type type, TimeStamp from, TimeStamp to, int stack_index = -1) { const std::lock_guard lock(mutex); - list.push_back({ type, Marker::INTERVAL, from, to, get_native_thread_id(), stack_index }); + list.push_back({ type, Marker::INTERVAL, from, to, stack_index }); } void record(Marker::Type type, int stack_index = -1) { const std::lock_guard lock(mutex); - list.push_back({ type, Marker::INSTANT, TimeStamp::Now(), TimeStamp(), get_native_thread_id(), stack_index }); + list.push_back({ type, Marker::INSTANT, TimeStamp::Now(), TimeStamp(), stack_index }); + } +}; + +class GCMarkerTable: public MarkerTable { + TimeStamp last_gc_entry; + + public: + void record_gc_entered() { + last_gc_entry = TimeStamp::Now(); + } + + void record_gc_leave() { + list.push_back({ Marker::MARKER_GC_PAUSE, Marker::INTERVAL, last_gc_entry, TimeStamp::Now(), -1 }); } }; @@ -731,6 +767,8 @@ class Thread { STOPPED }; + VALUE ruby_thread; + VALUE ruby_thread_id; pthread_t pthread_id; native_thread_id_t native_tid; State state; @@ -742,18 +780,33 @@ class Thread { int stack_on_suspend_idx; SampleTranslator translator; - std::string name; + MarkerTable *markers; + + std::string name; - Thread(State state) : state(state), stack_on_suspend_idx(-1) { - pthread_id = pthread_self(); + // FIXME: don't use pthread at start + Thread(State state, pthread_t pthread_id, VALUE ruby_thread) : pthread_id(pthread_id), ruby_thread(ruby_thread), state(state), stack_on_suspend_idx(-1) { + name = Qnil; + ruby_thread_id = rb_obj_id(ruby_thread); + //ruby_thread_id = ULL2NUM(ruby_thread); native_tid = get_native_thread_id(); started_at = state_changed_at = TimeStamp::Now(); + name = ""; + markers = new MarkerTable(); + + if (state == State::STARTED) { + markers->record(Marker::Type::MARKER_GVL_THREAD_STARTED); + } } - void set_state(State new_state, MarkerTable *markers) { + void set_state(State new_state) { if (state == Thread::State::STOPPED) { return; } + if (new_state == Thread::State::SUSPENDED && state == new_state) { + // on Ruby 3.2 (only?) we may see duplicate suspended states + return; + } TimeStamp from = state_changed_at; auto now = TimeStamp::Now(); @@ -764,10 +817,13 @@ class Thread { switch (new_state) { case State::STARTED: - new_state = State::RUNNING; + markers->record(Marker::Type::MARKER_GVL_THREAD_STARTED); + return; // no mutation of current state break; case State::RUNNING: - assert(state == State::READY); + assert(state == State::READY || state == State::RUNNING); + pthread_id = pthread_self(); + native_tid = get_native_thread_id(); // If the GVL is immediately ready, and we measure no times // stalled, skip emitting the interval. @@ -783,23 +839,25 @@ class Thread { // Threads can be preempted, which means they will have been in "Running" // state, and then the VM was like "no I need to stop you from working, // so I'll put you in the 'ready' (or stalled) state" - assert(state == State::SUSPENDED || state == State::RUNNING); + assert(state == State::STARTED || state == State::SUSPENDED || state == State::RUNNING); if (state == State::SUSPENDED) { markers->record_interval(Marker::Type::MARKER_THREAD_SUSPENDED, from, now, stack_on_suspend_idx); } - else { + else if (state == State::RUNNING) { markers->record_interval(Marker::Type::MARKER_THREAD_RUNNING, from, now); } break; case State::SUSPENDED: // We can go from RUNNING or STARTED to SUSPENDED - assert(state == State::RUNNING || state == State::STARTED); + assert(state == State::RUNNING || state == State::STARTED || state == State::SUSPENDED); markers->record_interval(Marker::Type::MARKER_THREAD_RUNNING, from, now); break; case State::STOPPED: // We can go from RUNNING or STARTED to STOPPED assert(state == State::RUNNING || state == State::STARTED); markers->record_interval(Marker::Type::MARKER_THREAD_RUNNING, from, now); + markers->record(Marker::Type::MARKER_GVL_THREAD_EXITED); + stopped_at = now; capture_name(); @@ -815,10 +873,13 @@ class Thread { } void capture_name() { - char buf[128]; - int rc = pthread_getname_np(pthread_id, buf, sizeof(buf)); - if (rc == 0) - name = std::string(buf); + //char buf[128]; + //int rc = pthread_getname_np(pthread_id, buf, sizeof(buf)); + //if (rc == 0) + // name = std::string(buf); + } + + void mark() { } }; @@ -832,40 +893,49 @@ class ThreadTable { ThreadTable(FrameList &frame_list) : frame_list(frame_list) { } - void started(MarkerTable *markers) { - //const std::lock_guard lock(mutex); + void mark() { + for (auto &thread : list) { + thread.mark(); + } + } + void started(VALUE th) { //list.push_back(Thread{pthread_self(), Thread::State::SUSPENDED}); - markers->record(Marker::Type::MARKER_GVL_THREAD_STARTED); - set_state(Thread::State::STARTED, markers); + set_state(Thread::State::STARTED, th); } - void ready(MarkerTable *markers) { - set_state(Thread::State::READY, markers); + void ready(VALUE th) { + set_state(Thread::State::READY, th); } - void resumed(MarkerTable *markers) { - set_state(Thread::State::RUNNING, markers); + void resumed(VALUE th) { + set_state(Thread::State::RUNNING, th); } - void suspended(MarkerTable *markers) { - set_state(Thread::State::SUSPENDED, markers); + void suspended(VALUE th) { + set_state(Thread::State::SUSPENDED, th); } - void stopped(MarkerTable *markers) { - markers->record(Marker::Type::MARKER_GVL_THREAD_EXITED); - set_state(Thread::State::STOPPED, markers); + void stopped(VALUE th) { + set_state(Thread::State::STOPPED, th); } private: - void set_state(Thread::State new_state, MarkerTable *markers) { + void set_state(Thread::State new_state, VALUE th) { const std::lock_guard lock(mutex); - pthread_t current_thread = pthread_self(); //cerr << "set state=" << new_state << " thread=" << gettid() << endl; + pid_t native_tid = get_native_thread_id(); + pthread_t pthread_id = pthread_self(); + + //fprintf(stderr, "th %p (tid: %i) from %s to %s\n", (void *)th, native_tid, gvl_event_name(state), gvl_event_name(new_state)); + for (auto &thread : list) { - if (pthread_equal(current_thread, thread.pthread_id)) { + if (thread.pthread_id == pthread_id) { + thread.pthread_id = 0; + } + if (thread_equal(th, thread.ruby_thread)) { if (new_state == Thread::State::SUSPENDED) { RawSample sample; @@ -875,14 +945,27 @@ class ThreadTable { //cerr << gettid() << " suspended! Stack size:" << thread.stack_on_suspend.size() << endl; } - thread.set_state(new_state, markers); + thread.set_state(new_state); + + if (thread.state == Thread::State::RUNNING) { + thread.pthread_id = pthread_self(); + thread.native_tid = get_native_thread_id(); + } else { + thread.pthread_id = 0; + thread.native_tid = 0; + } + return; } } - pid_t native_tid = get_native_thread_id(); - list.emplace_back(new_state); + //fprintf(stderr, "NEW THREAD: th: %p, state: %i\n", th, new_state); + list.emplace_back(new_state, pthread_self(), th); + } + + bool thread_equal(VALUE a, VALUE b) { + return a == b; } }; @@ -1151,6 +1234,10 @@ class GlobalSignalHandler { void record_sample(LiveSample &sample, pthread_t pthread_id) { const std::lock_guard lock(mutex); + if (!pthread_id) { + abort(); + } + live_sample = &sample; if (pthread_kill(pthread_id, SIGPROF)) { rb_bug("pthread_kill failed"); @@ -1187,7 +1274,7 @@ class GlobalSignalHandler { LiveSample *GlobalSignalHandler::live_sample; class TimeCollector : public BaseCollector { - MarkerTable markers; + GCMarkerTable gc_markers; ThreadTable threads; pthread_t sample_thread; @@ -1216,10 +1303,22 @@ class TimeCollector : public BaseCollector { } VALUE get_markers() { - VALUE list = rb_ary_new2(this->markers.list.size()); + VALUE list = rb_ary_new(); + VALUE main_thread = rb_thread_main(); + VALUE main_thread_id = rb_obj_id(main_thread); + + for (auto& marker: this->gc_markers.list) { + VALUE ary = marker.to_array(); - for (auto& marker: this->markers.list) { - rb_ary_push(list, marker.to_array()); + RARRAY_ASET(ary, 0, main_thread_id); + rb_ary_push(list, ary); + } + for (auto &thread : threads.list) { + for (auto& marker: thread.markers->list) { + VALUE ary = marker.to_array(); + RARRAY_ASET(ary, 0, thread.ruby_thread_id); + rb_ary_push(list, ary); + } } return list; @@ -1235,7 +1334,9 @@ class TimeCollector : public BaseCollector { threads.mutex.lock(); for (auto &thread : threads.list) { //if (thread.state == Thread::State::RUNNING) { - if (thread.state == Thread::State::RUNNING || (thread.state == Thread::State::SUSPENDED && thread.stack_on_suspend_idx < 0)) { + //if (thread.state == Thread::State::RUNNING || (thread.state == Thread::State::SUSPENDED && thread.stack_on_suspend_idx < 0)) { + if (thread.state == Thread::State::RUNNING) { + //fprintf(stderr, "sampling %p on tid:%i\n", thread.ruby_thread, thread.native_tid); GlobalSignalHandler::get_instance()->record_sample(sample, thread.pthread_id); if (sample.sample.gc) { @@ -1281,10 +1382,10 @@ class TimeCollector : public BaseCollector { switch (event) { case RUBY_EVENT_THREAD_BEGIN: - collector->threads.started(&collector->markers); + collector->threads.started(self); break; case RUBY_EVENT_THREAD_END: - collector->threads.stopped(&collector->markers); + collector->threads.stopped(self); break; } } @@ -1294,36 +1395,57 @@ class TimeCollector : public BaseCollector { switch (event) { case RUBY_INTERNAL_EVENT_GC_START: - collector->markers.record(Marker::Type::MARKER_GC_START); + collector->gc_markers.record(Marker::Type::MARKER_GC_START); break; case RUBY_INTERNAL_EVENT_GC_END_MARK: - collector->markers.record(Marker::Type::MARKER_GC_END_MARK); + collector->gc_markers.record(Marker::Type::MARKER_GC_END_MARK); break; case RUBY_INTERNAL_EVENT_GC_END_SWEEP: - collector->markers.record(Marker::Type::MARKER_GC_END_SWEEP); + collector->gc_markers.record(Marker::Type::MARKER_GC_END_SWEEP); break; case RUBY_INTERNAL_EVENT_GC_ENTER: - collector->markers.record_gc_entered(); + collector->gc_markers.record_gc_entered(); break; case RUBY_INTERNAL_EVENT_GC_EXIT: - collector->markers.record_gc_leave(); + collector->gc_markers.record_gc_leave(); break; } } static void internal_thread_event_cb(rb_event_flag_t event, const rb_internal_thread_event_data_t *event_data, void *data) { TimeCollector *collector = static_cast(data); + VALUE thread = Qnil; + +#if HAVE_RB_INTERNAL_THREAD_EVENT_DATA_T_THREAD + thread = event_data->thread; +#else + // We may arrive here when starting a thread with + // RUBY_INTERNAL_THREAD_EVENT_READY before the thread is actually set up. + if (!ruby_native_thread_p()) return; + + thread = rb_thread_current(); +#endif + + auto native_tid = get_native_thread_id(); //cerr << "internal thread event" << event << " at " << TimeStamp::Now() << endl; + //fprintf(stderr, "(%i) th %p to %s\n", native_tid, (void *)thread, gvl_event_name(event)); + switch (event) { + case RUBY_INTERNAL_THREAD_EVENT_STARTED: + collector->threads.started(thread); + break; + case RUBY_INTERNAL_THREAD_EVENT_EXITED: + collector->threads.stopped(thread); + break; case RUBY_INTERNAL_THREAD_EVENT_READY: - collector->threads.ready(&collector->markers); + collector->threads.ready(thread); break; case RUBY_INTERNAL_THREAD_EVENT_RESUMED: - collector->threads.resumed(&collector->markers); + collector->threads.resumed(thread); break; case RUBY_INTERNAL_THREAD_EVENT_SUSPENDED: - collector->threads.suspended(&collector->markers); + collector->threads.suspended(thread); break; } @@ -1351,7 +1473,7 @@ class TimeCollector : public BaseCollector { // have at least one thread in our thread list because it's possible // that the profile might be such that we don't get any thread switch // events and we need at least one - this->threads.resumed(&this->markers); + this->threads.resumed(rb_thread_current()); thread_hook = rb_internal_thread_add_event_hook(internal_thread_event_cb, RUBY_INTERNAL_THREAD_EVENT_MASK, this); rb_add_event_hook(internal_gc_event_cb, RUBY_INTERNAL_EVENTS, PTR2NUM((void *)this)); @@ -1398,7 +1520,7 @@ class TimeCollector : public BaseCollector { VALUE hash = rb_hash_new(); thread.samples.write_result(hash); - rb_hash_aset(threads, ULL2NUM(thread.native_tid), hash); + rb_hash_aset(threads, thread.ruby_thread_id, hash); rb_hash_aset(hash, sym("tid"), ULL2NUM(thread.native_tid)); rb_hash_aset(hash, sym("started_at"), ULL2NUM(thread.started_at.nanoseconds())); if (!thread.stopped_at.zero()) { @@ -1415,6 +1537,7 @@ class TimeCollector : public BaseCollector { void mark() { frame_list.mark_frames(); + threads.mark(); //for (int i = 0; i < queued_length; i++) { // rb_gc_mark(queued_frames[i]); diff --git a/lib/vernier/collector.rb b/lib/vernier/collector.rb index 0f5c38c..9ad944f 100644 --- a/lib/vernier/collector.rb +++ b/lib/vernier/collector.rb @@ -19,7 +19,7 @@ def current_time Process.clock_gettime(Process::CLOCK_MONOTONIC, :nanosecond) end - def add_marker(name:, start:, finish:, thread: Thread.current.native_thread_id, phase: Marker::Phase::INTERVAL, data: nil) + def add_marker(name:, start:, finish:, thread: Thread.current.object_id, phase: Marker::Phase::INTERVAL, data: nil) @markers << [thread, name, start, @@ -39,7 +39,7 @@ def record_interval(category, name = category) start:, finish: current_time, phase: Marker::Phase::INTERVAL, - thread: Thread.current.native_thread_id, + thread: Thread.current.object_id, data: { :type => 'UserTiming', :entryType => 'measure', :name => name } ) end diff --git a/lib/vernier/output/firefox.rb b/lib/vernier/output/firefox.rb index 8744fca..c83545b 100644 --- a/lib/vernier/output/firefox.rb +++ b/lib/vernier/output/firefox.rb @@ -99,9 +99,10 @@ def output def data markers_by_thread = profile.markers.group_by { |marker| marker[0] } - thread_data = profile.threads.map do |tid, thread_info| - markers = markers_by_thread[tid] || [] + thread_data = profile.threads.map do |ruby_thread_id, thread_info| + markers = markers_by_thread[ruby_thread_id] || [] Thread.new( + ruby_thread_id, profile, @categorizer, markers: markers, @@ -157,7 +158,8 @@ def marker_schema class Thread attr_reader :profile - def initialize(profile, categorizer, name:, tid:, samples:, weights:, timestamps: nil, sample_categories: nil, markers:, started_at:, stopped_at: nil) + def initialize(ruby_thread_id, profile, categorizer, name:, tid:, samples:, weights:, timestamps: nil, sample_categories: nil, markers:, started_at:, stopped_at: nil) + @ruby_thread_id = ruby_thread_id @profile = profile @categorizer = categorizer @tid = tid @@ -212,7 +214,7 @@ def initialize(profile, categorizer, name:, tid:, samples:, weights:, timestamps def data { name: @name, - isMainThread: (@tid == ::Thread.main.native_thread_id) || (profile.threads.size == 1), + isMainThread: @ruby_thread_id == ::Thread.main.object_id || (profile.threads.size == 1), processStartupTime: 0, # FIXME processShutdownTime: nil, # FIXME registerTime: (@started_at - 0) / 1_000_000.0, diff --git a/test/test_time_collector.rb b/test/test_time_collector.rb index 8988f06..53fb0d5 100644 --- a/test/test_time_collector.rb +++ b/test/test_time_collector.rb @@ -49,8 +49,8 @@ def test_time_collector def test_sleeping_threads collector = Vernier::Collector.new(:wall, interval: SAMPLE_SCALE_INTERVAL) - th1 = Thread.new { two_slow_methods; Thread.current.native_thread_id } - th2 = Thread.new { two_slow_methods; Thread.current.native_thread_id } + th1 = Thread.new { two_slow_methods; Thread.current.object_id } + th2 = Thread.new { two_slow_methods; Thread.current.object_id } collector.start th1id = th1.value th2id = th2.value @@ -61,7 +61,7 @@ def test_sleeping_threads thread[:weights].sum end.to_h - assert_in_epsilon 200, tally[Thread.current.native_thread_id], generous_epsilon + assert_in_epsilon 200, tally[Thread.current.object_id], generous_epsilon assert_in_epsilon 200, tally[th1id], generous_epsilon assert_in_epsilon 200, tally[th2id], generous_epsilon