зеркало из https://github.com/github/ruby.git
Refactor and fix the GVL instrumentation API
This entirely changes how it is tested. Rather than to use counters we now record the timeline of events with associated threads which makes it much easier to assert that certains events are only preceded by a specific event, and makes it much easier to debug unexpected timelines. Co-Authored-By: Étienne Barrié <etienne.barrie@gmail.com> Co-Authored-By: JP Camara <jp@jpcamara.com> Co-Authored-By: John Hawthorn <john@hawthorn.email>
This commit is contained in:
Родитель
e3875dd0f8
Коммит
23a7714343
|
@ -2,86 +2,137 @@
|
||||||
#include "ruby/atomic.h"
|
#include "ruby/atomic.h"
|
||||||
#include "ruby/thread.h"
|
#include "ruby/thread.h"
|
||||||
|
|
||||||
static rb_atomic_t started_count = 0;
|
|
||||||
static rb_atomic_t ready_count = 0;
|
|
||||||
static rb_atomic_t resumed_count = 0;
|
|
||||||
static rb_atomic_t suspended_count = 0;
|
|
||||||
static rb_atomic_t exited_count = 0;
|
|
||||||
|
|
||||||
#ifndef RB_THREAD_LOCAL_SPECIFIER
|
#ifndef RB_THREAD_LOCAL_SPECIFIER
|
||||||
# define RB_THREAD_LOCAL_SPECIFIER
|
# define RB_THREAD_LOCAL_SPECIFIER
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
static RB_THREAD_LOCAL_SPECIFIER unsigned int local_ready_count = 0;
|
|
||||||
static RB_THREAD_LOCAL_SPECIFIER unsigned int local_resumed_count = 0;
|
|
||||||
static RB_THREAD_LOCAL_SPECIFIER unsigned int local_suspended_count = 0;
|
|
||||||
|
|
||||||
static VALUE last_thread = Qnil;
|
static VALUE last_thread = Qnil;
|
||||||
|
static VALUE timeline_value = Qnil;
|
||||||
|
|
||||||
|
struct thread_event {
|
||||||
|
VALUE thread;
|
||||||
|
rb_event_flag_t event;
|
||||||
|
};
|
||||||
|
|
||||||
|
#define MAX_EVENTS 1024
|
||||||
|
static struct thread_event event_timeline[MAX_EVENTS];
|
||||||
|
static rb_atomic_t timeline_cursor;
|
||||||
|
|
||||||
|
static void
|
||||||
|
event_timeline_gc_mark(void *ptr) {
|
||||||
|
rb_atomic_t cursor;
|
||||||
|
for (cursor = 0; cursor < timeline_cursor; cursor++) {
|
||||||
|
rb_gc_mark(event_timeline[cursor].thread);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static const rb_data_type_t event_timeline_type = {
|
||||||
|
"TestThreadInstrumentation/event_timeline",
|
||||||
|
{event_timeline_gc_mark, NULL, NULL,},
|
||||||
|
0, 0,
|
||||||
|
RUBY_TYPED_FREE_IMMEDIATELY,
|
||||||
|
};
|
||||||
|
|
||||||
|
static void
|
||||||
|
reset_timeline(void)
|
||||||
|
{
|
||||||
|
timeline_cursor = 0;
|
||||||
|
memset(event_timeline, 0, sizeof(struct thread_event) * MAX_EVENTS);
|
||||||
|
}
|
||||||
|
|
||||||
|
static rb_event_flag_t
|
||||||
|
find_last_event(VALUE thread)
|
||||||
|
{
|
||||||
|
rb_atomic_t cursor = timeline_cursor;
|
||||||
|
if (cursor) {
|
||||||
|
do {
|
||||||
|
if (event_timeline[cursor].thread == thread){
|
||||||
|
return event_timeline[cursor].event;
|
||||||
|
}
|
||||||
|
cursor--;
|
||||||
|
} while (cursor > 0);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static const char *
|
||||||
|
event_name(rb_event_flag_t event)
|
||||||
|
{
|
||||||
|
switch (event) {
|
||||||
|
case RUBY_INTERNAL_THREAD_EVENT_STARTED:
|
||||||
|
return "started";
|
||||||
|
case RUBY_INTERNAL_THREAD_EVENT_READY:
|
||||||
|
return "ready";
|
||||||
|
case RUBY_INTERNAL_THREAD_EVENT_RESUMED:
|
||||||
|
return "resumed";
|
||||||
|
case RUBY_INTERNAL_THREAD_EVENT_SUSPENDED:
|
||||||
|
return "suspended";
|
||||||
|
case RUBY_INTERNAL_THREAD_EVENT_EXITED:
|
||||||
|
return "exited";
|
||||||
|
}
|
||||||
|
return "no-event";
|
||||||
|
}
|
||||||
|
|
||||||
|
NORETURN(static void unexpected(const char *format, const char *event_name, VALUE thread));
|
||||||
|
|
||||||
|
static void
|
||||||
|
unexpected(const char *format, const char *event_name, VALUE thread)
|
||||||
|
{
|
||||||
|
#if 0
|
||||||
|
fprintf(stderr, "----------------\n");
|
||||||
|
fprintf(stderr, format, event_name, thread);
|
||||||
|
fprintf(stderr, "\n");
|
||||||
|
rb_backtrace();
|
||||||
|
fprintf(stderr, "----------------\n");
|
||||||
|
#else
|
||||||
|
rb_bug(format, event_name, thread);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
ex_callback(rb_event_flag_t event, const rb_internal_thread_event_data_t *event_data, void *user_data)
|
ex_callback(rb_event_flag_t event, const rb_internal_thread_event_data_t *event_data, void *user_data)
|
||||||
{
|
{
|
||||||
|
rb_event_flag_t last_event = find_last_event(event_data->thread);
|
||||||
|
|
||||||
switch (event) {
|
switch (event) {
|
||||||
case RUBY_INTERNAL_THREAD_EVENT_STARTED:
|
case RUBY_INTERNAL_THREAD_EVENT_STARTED:
|
||||||
last_thread = event_data->thread;
|
if (last_event != 0) {
|
||||||
RUBY_ATOMIC_INC(started_count);
|
unexpected("TestThreadInstrumentation: `started` event can't be preceded by `%s` (thread=%"PRIxVALUE")", event_name(last_event), event_data->thread);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case RUBY_INTERNAL_THREAD_EVENT_READY:
|
case RUBY_INTERNAL_THREAD_EVENT_READY:
|
||||||
RUBY_ATOMIC_INC(ready_count);
|
if (last_event != 0 && last_event != RUBY_INTERNAL_THREAD_EVENT_STARTED && last_event != RUBY_INTERNAL_THREAD_EVENT_SUSPENDED) {
|
||||||
local_ready_count++;
|
unexpected("TestThreadInstrumentation: `ready` must be preceded by `started` or `suspended`, got: `%s` (thread=%"PRIxVALUE")", event_name(last_event), event_data->thread);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case RUBY_INTERNAL_THREAD_EVENT_RESUMED:
|
case RUBY_INTERNAL_THREAD_EVENT_RESUMED:
|
||||||
RUBY_ATOMIC_INC(resumed_count);
|
if (last_event != 0 && last_event != RUBY_INTERNAL_THREAD_EVENT_READY) {
|
||||||
local_resumed_count++;
|
unexpected("TestThreadInstrumentation: `resumed` must be preceded by `ready`, got: `%s` (thread=%"PRIxVALUE")", event_name(last_event), event_data->thread);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case RUBY_INTERNAL_THREAD_EVENT_SUSPENDED:
|
case RUBY_INTERNAL_THREAD_EVENT_SUSPENDED:
|
||||||
RUBY_ATOMIC_INC(suspended_count);
|
if (last_event != 0 && last_event != RUBY_INTERNAL_THREAD_EVENT_RESUMED) {
|
||||||
local_suspended_count++;
|
unexpected("TestThreadInstrumentation: `suspended` must be preceded by `resumed`, got: `%s` (thread=%"PRIxVALUE")", event_name(last_event), event_data->thread);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case RUBY_INTERNAL_THREAD_EVENT_EXITED:
|
case RUBY_INTERNAL_THREAD_EVENT_EXITED:
|
||||||
RUBY_ATOMIC_INC(exited_count);
|
if (last_event != 0 && last_event != RUBY_INTERNAL_THREAD_EVENT_RESUMED && last_event != RUBY_INTERNAL_THREAD_EVENT_SUSPENDED) {
|
||||||
|
unexpected("TestThreadInstrumentation: `exited` must be preceded by `resumed` or `suspended`, got: `%s` (thread=%"PRIxVALUE")", event_name(last_event), event_data->thread);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rb_atomic_t cursor = RUBY_ATOMIC_FETCH_ADD(timeline_cursor, 1);
|
||||||
|
if (cursor >= MAX_EVENTS) {
|
||||||
|
rb_bug("TestThreadInstrumentation: ran out of event_timeline space");
|
||||||
|
}
|
||||||
|
|
||||||
|
event_timeline[cursor].thread = event_data->thread;
|
||||||
|
event_timeline[cursor].event = event;
|
||||||
}
|
}
|
||||||
|
|
||||||
static rb_internal_thread_event_hook_t * single_hook = NULL;
|
static rb_internal_thread_event_hook_t * single_hook = NULL;
|
||||||
|
|
||||||
static VALUE
|
|
||||||
thread_counters(VALUE thread)
|
|
||||||
{
|
|
||||||
VALUE array = rb_ary_new2(5);
|
|
||||||
rb_ary_push(array, UINT2NUM(started_count));
|
|
||||||
rb_ary_push(array, UINT2NUM(ready_count));
|
|
||||||
rb_ary_push(array, UINT2NUM(resumed_count));
|
|
||||||
rb_ary_push(array, UINT2NUM(suspended_count));
|
|
||||||
rb_ary_push(array, UINT2NUM(exited_count));
|
|
||||||
return array;
|
|
||||||
}
|
|
||||||
|
|
||||||
static VALUE
|
|
||||||
thread_local_counters(VALUE thread)
|
|
||||||
{
|
|
||||||
VALUE array = rb_ary_new2(3);
|
|
||||||
rb_ary_push(array, UINT2NUM(local_ready_count));
|
|
||||||
rb_ary_push(array, UINT2NUM(local_resumed_count));
|
|
||||||
rb_ary_push(array, UINT2NUM(local_suspended_count));
|
|
||||||
return array;
|
|
||||||
}
|
|
||||||
|
|
||||||
static VALUE
|
|
||||||
thread_reset_counters(VALUE thread)
|
|
||||||
{
|
|
||||||
RUBY_ATOMIC_SET(started_count, 0);
|
|
||||||
RUBY_ATOMIC_SET(ready_count, 0);
|
|
||||||
RUBY_ATOMIC_SET(resumed_count, 0);
|
|
||||||
RUBY_ATOMIC_SET(suspended_count, 0);
|
|
||||||
RUBY_ATOMIC_SET(exited_count, 0);
|
|
||||||
local_ready_count = 0;
|
|
||||||
local_resumed_count = 0;
|
|
||||||
local_suspended_count = 0;
|
|
||||||
return Qtrue;
|
|
||||||
}
|
|
||||||
|
|
||||||
static VALUE
|
static VALUE
|
||||||
thread_register_callback(VALUE thread)
|
thread_register_callback(VALUE thread)
|
||||||
{
|
{
|
||||||
|
@ -98,6 +149,26 @@ thread_register_callback(VALUE thread)
|
||||||
return Qnil;
|
return Qnil;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static VALUE
|
||||||
|
event_symbol(rb_event_flag_t event)
|
||||||
|
{
|
||||||
|
switch (event) {
|
||||||
|
case RUBY_INTERNAL_THREAD_EVENT_STARTED:
|
||||||
|
return rb_id2sym(rb_intern("started"));
|
||||||
|
case RUBY_INTERNAL_THREAD_EVENT_READY:
|
||||||
|
return rb_id2sym(rb_intern("ready"));
|
||||||
|
case RUBY_INTERNAL_THREAD_EVENT_RESUMED:
|
||||||
|
return rb_id2sym(rb_intern("resumed"));
|
||||||
|
case RUBY_INTERNAL_THREAD_EVENT_SUSPENDED:
|
||||||
|
return rb_id2sym(rb_intern("suspended"));
|
||||||
|
case RUBY_INTERNAL_THREAD_EVENT_EXITED:
|
||||||
|
return rb_id2sym(rb_intern("exited"));
|
||||||
|
default:
|
||||||
|
rb_bug("TestThreadInstrumentation: Unexpected event");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static VALUE
|
static VALUE
|
||||||
thread_unregister_callback(VALUE thread)
|
thread_unregister_callback(VALUE thread)
|
||||||
{
|
{
|
||||||
|
@ -106,7 +177,18 @@ thread_unregister_callback(VALUE thread)
|
||||||
single_hook = NULL;
|
single_hook = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
return Qnil;
|
VALUE events = rb_ary_new_capa(timeline_cursor);
|
||||||
|
rb_atomic_t cursor;
|
||||||
|
for (cursor = 0; cursor < timeline_cursor; cursor++) {
|
||||||
|
VALUE pair = rb_ary_new_capa(2);
|
||||||
|
rb_ary_push(pair, event_timeline[cursor].thread);
|
||||||
|
rb_ary_push(pair, event_symbol(event_timeline[cursor].event));
|
||||||
|
rb_ary_push(events, pair);
|
||||||
|
}
|
||||||
|
|
||||||
|
reset_timeline();
|
||||||
|
|
||||||
|
return events;
|
||||||
}
|
}
|
||||||
|
|
||||||
static VALUE
|
static VALUE
|
||||||
|
@ -125,31 +207,16 @@ thread_register_and_unregister_callback(VALUE thread)
|
||||||
return Qtrue;
|
return Qtrue;
|
||||||
}
|
}
|
||||||
|
|
||||||
static VALUE
|
|
||||||
thread_last_spawned(VALUE mod)
|
|
||||||
{
|
|
||||||
return last_thread;
|
|
||||||
}
|
|
||||||
|
|
||||||
static VALUE
|
|
||||||
thread_set_last_spawned(VALUE mod, VALUE value)
|
|
||||||
{
|
|
||||||
return last_thread = value;
|
|
||||||
}
|
|
||||||
|
|
||||||
void
|
void
|
||||||
Init_instrumentation(void)
|
Init_instrumentation(void)
|
||||||
{
|
{
|
||||||
VALUE mBug = rb_define_module("Bug");
|
VALUE mBug = rb_define_module("Bug");
|
||||||
VALUE klass = rb_define_module_under(mBug, "ThreadInstrumentation");
|
VALUE klass = rb_define_module_under(mBug, "ThreadInstrumentation");
|
||||||
|
rb_global_variable(&timeline_value);
|
||||||
|
timeline_value = TypedData_Wrap_Struct(0, &event_timeline_type, 0);
|
||||||
|
|
||||||
rb_global_variable(&last_thread);
|
rb_global_variable(&last_thread);
|
||||||
rb_define_singleton_method(klass, "counters", thread_counters, 0);
|
|
||||||
rb_define_singleton_method(klass, "local_counters", thread_local_counters, 0);
|
|
||||||
rb_define_singleton_method(klass, "reset_counters", thread_reset_counters, 0);
|
|
||||||
rb_define_singleton_method(klass, "register_callback", thread_register_callback, 0);
|
rb_define_singleton_method(klass, "register_callback", thread_register_callback, 0);
|
||||||
rb_define_singleton_method(klass, "unregister_callback", thread_unregister_callback, 0);
|
rb_define_singleton_method(klass, "unregister_callback", thread_unregister_callback, 0);
|
||||||
rb_define_singleton_method(klass, "register_and_unregister_callbacks", thread_register_and_unregister_callback, 0);
|
rb_define_singleton_method(klass, "register_and_unregister_callbacks", thread_register_and_unregister_callback, 0);
|
||||||
|
|
||||||
rb_define_singleton_method(klass, "last_spawned_thread", thread_last_spawned, 0);
|
|
||||||
rb_define_singleton_method(klass, "last_spawned_thread=", thread_set_last_spawned, 1);
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -243,9 +243,12 @@ typedef struct rb_internal_thread_event_hook rb_internal_thread_event_hook_t;
|
||||||
* @param[in] events A set of events that `func` should run.
|
* @param[in] events A set of events that `func` should run.
|
||||||
* @param[in] data Passed as-is to `func`.
|
* @param[in] data Passed as-is to `func`.
|
||||||
* @return An opaque pointer to the hook, to unregister it later.
|
* @return An opaque pointer to the hook, to unregister it later.
|
||||||
* @note This functionality is a noop on Windows.
|
* @note This functionality is a noop on Windows and WebAssembly.
|
||||||
* @note The callback will be called without the GVL held, except for the
|
* @note The callback will be called without the GVL held, except for the
|
||||||
* RESUMED event.
|
* RESUMED event.
|
||||||
|
* @note Callbacks are not guaranteed to be executed on the native threads
|
||||||
|
* that corresponds to the Ruby thread. To identify which Ruby thread
|
||||||
|
* the event refers to, you must use `event_data->thread`.
|
||||||
* @warning This function MUST not be called from a thread event callback.
|
* @warning This function MUST not be called from a thread event callback.
|
||||||
*/
|
*/
|
||||||
rb_internal_thread_event_hook_t *rb_internal_thread_add_event_hook(
|
rb_internal_thread_event_hook_t *rb_internal_thread_add_event_hook(
|
||||||
|
@ -258,7 +261,7 @@ rb_internal_thread_event_hook_t *rb_internal_thread_add_event_hook(
|
||||||
*
|
*
|
||||||
* @param[in] hook. The hook to unregister.
|
* @param[in] hook. The hook to unregister.
|
||||||
* @return Wether the hook was found and unregistered.
|
* @return Wether the hook was found and unregistered.
|
||||||
* @note This functionality is a noop on Windows.
|
* @note This functionality is a noop on Windows and WebAssembly.
|
||||||
* @warning This function MUST not be called from a thread event callback.
|
* @warning This function MUST not be called from a thread event callback.
|
||||||
*/
|
*/
|
||||||
bool rb_internal_thread_remove_event_hook(
|
bool rb_internal_thread_remove_event_hook(
|
||||||
|
|
|
@ -7,75 +7,200 @@ class TestThreadInstrumentation < Test::Unit::TestCase
|
||||||
|
|
||||||
require '-test-/thread/instrumentation'
|
require '-test-/thread/instrumentation'
|
||||||
|
|
||||||
Thread.list.each do |thread|
|
cleanup_threads
|
||||||
if thread != Thread.current
|
|
||||||
thread.kill
|
|
||||||
thread.join rescue nil
|
|
||||||
end
|
|
||||||
end
|
|
||||||
assert_equal [Thread.current], Thread.list
|
|
||||||
|
|
||||||
Bug::ThreadInstrumentation.reset_counters
|
|
||||||
Bug::ThreadInstrumentation::register_callback
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def teardown
|
def teardown
|
||||||
return if /mswin|mingw|bccwin/ =~ RUBY_PLATFORM
|
return if /mswin|mingw|bccwin/ =~ RUBY_PLATFORM
|
||||||
Bug::ThreadInstrumentation::unregister_callback
|
Bug::ThreadInstrumentation.unregister_callback
|
||||||
Bug::ThreadInstrumentation.last_spawned_thread = nil
|
cleanup_threads
|
||||||
end
|
end
|
||||||
|
|
||||||
THREADS_COUNT = 3
|
THREADS_COUNT = 3
|
||||||
|
|
||||||
def test_thread_instrumentation
|
def test_single_thread_timeline
|
||||||
threads = threaded_cpu_work
|
thread = nil
|
||||||
assert_equal [false] * THREADS_COUNT, threads.map(&:status)
|
full_timeline = record do
|
||||||
counters = Bug::ThreadInstrumentation.counters
|
thread = Thread.new { 1 + 1 }
|
||||||
assert_join_counters(counters)
|
thread.join
|
||||||
assert_global_join_counters(counters)
|
end
|
||||||
|
assert_equal %i(started ready resumed suspended exited), timeline_for(thread, full_timeline)
|
||||||
|
ensure
|
||||||
|
thread&.kill
|
||||||
end
|
end
|
||||||
|
|
||||||
def test_join_counters # Bug #18900
|
def test_muti_thread_timeline
|
||||||
thr = Thread.new { fib(30) }
|
threads = nil
|
||||||
Bug::ThreadInstrumentation.reset_counters
|
full_timeline = record do
|
||||||
thr.join
|
threads = threaded_cpu_work
|
||||||
assert_join_counters(Bug::ThreadInstrumentation.local_counters)
|
fib(20)
|
||||||
|
assert_equal [false] * THREADS_COUNT, threads.map(&:status)
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
|
threads.each do |thread|
|
||||||
|
timeline = timeline_for(thread, full_timeline)
|
||||||
|
assert_consistent_timeline(timeline)
|
||||||
|
end
|
||||||
|
|
||||||
|
timeline = timeline_for(Thread.current, full_timeline)
|
||||||
|
assert_consistent_timeline(timeline)
|
||||||
|
ensure
|
||||||
|
threads&.each(&:kill)
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_join_suspends # Bug #18900
|
||||||
|
thread = other_thread = nil
|
||||||
|
full_timeline = record do
|
||||||
|
other_thread = Thread.new { sleep 0.3 }
|
||||||
|
thread = Thread.new { other_thread.join }
|
||||||
|
thread.join
|
||||||
|
end
|
||||||
|
|
||||||
|
timeline = timeline_for(thread, full_timeline)
|
||||||
|
assert_consistent_timeline(timeline)
|
||||||
|
assert_equal %i(started ready resumed suspended ready resumed suspended exited), timeline
|
||||||
|
ensure
|
||||||
|
other_thread&.kill
|
||||||
|
thread&.kill
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_io_release_gvl
|
||||||
|
r, w = IO.pipe
|
||||||
|
thread = nil
|
||||||
|
full_timeline = record do
|
||||||
|
thread = Thread.new do
|
||||||
|
w.write("Hello\n")
|
||||||
|
end
|
||||||
|
thread.join
|
||||||
|
end
|
||||||
|
|
||||||
|
timeline = timeline_for(thread, full_timeline)
|
||||||
|
assert_consistent_timeline(timeline)
|
||||||
|
assert_equal %i(started ready resumed suspended ready resumed suspended exited), timeline
|
||||||
|
ensure
|
||||||
|
r&.close
|
||||||
|
w&.close
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_queue_releases_gvl
|
||||||
|
queue1 = Queue.new
|
||||||
|
queue2 = Queue.new
|
||||||
|
|
||||||
|
thread = nil
|
||||||
|
|
||||||
|
full_timeline = record do
|
||||||
|
thread = Thread.new do
|
||||||
|
queue1 << true
|
||||||
|
queue2.pop
|
||||||
|
end
|
||||||
|
|
||||||
|
queue1.pop
|
||||||
|
queue2 << true
|
||||||
|
thread.join
|
||||||
|
end
|
||||||
|
|
||||||
|
timeline = timeline_for(thread, full_timeline)
|
||||||
|
assert_consistent_timeline(timeline)
|
||||||
|
assert_equal %i(started ready resumed suspended ready resumed suspended exited), timeline
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_thread_blocked_forever
|
||||||
|
mutex = Mutex.new
|
||||||
|
mutex.lock
|
||||||
|
thread = nil
|
||||||
|
|
||||||
|
full_timeline = record do
|
||||||
|
thread = Thread.new do
|
||||||
|
mutex.lock
|
||||||
|
end
|
||||||
|
10.times { Thread.pass }
|
||||||
|
sleep 0.1
|
||||||
|
end
|
||||||
|
|
||||||
|
mutex.unlock
|
||||||
|
thread.join
|
||||||
|
|
||||||
|
timeline = timeline_for(thread, full_timeline)
|
||||||
|
assert_consistent_timeline(timeline)
|
||||||
|
assert_equal %i(started ready resumed), timeline
|
||||||
end
|
end
|
||||||
|
|
||||||
def test_thread_instrumentation_fork_safe
|
def test_thread_instrumentation_fork_safe
|
||||||
skip "No fork()" unless Process.respond_to?(:fork)
|
skip "No fork()" unless Process.respond_to?(:fork)
|
||||||
|
|
||||||
thread_statuses = counters = nil
|
thread_statuses = full_timeline = nil
|
||||||
IO.popen("-") do |read_pipe|
|
IO.popen("-") do |read_pipe|
|
||||||
if read_pipe
|
if read_pipe
|
||||||
thread_statuses = Marshal.load(read_pipe)
|
thread_statuses = Marshal.load(read_pipe)
|
||||||
counters = Marshal.load(read_pipe)
|
full_timeline = Marshal.load(read_pipe)
|
||||||
else
|
else
|
||||||
Bug::ThreadInstrumentation.reset_counters
|
|
||||||
threads = threaded_cpu_work
|
threads = threaded_cpu_work
|
||||||
Marshal.dump(threads.map(&:status), STDOUT)
|
Marshal.dump(threads.map(&:status), STDOUT)
|
||||||
Marshal.dump(Bug::ThreadInstrumentation.counters, STDOUT)
|
full_timeline = Bug::ThreadInstrumentation.unregister_callback.map { |t, e| [t.to_s, e ] }
|
||||||
|
Marshal.dump(full_timeline, STDOUT)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
assert_predicate $?, :success?
|
assert_predicate $?, :success?
|
||||||
|
|
||||||
assert_equal [false] * THREADS_COUNT, thread_statuses
|
assert_equal [false] * THREADS_COUNT, thread_statuses
|
||||||
assert_join_counters(counters)
|
thread_names = full_timeline.map(&:first).uniq
|
||||||
assert_global_join_counters(counters)
|
thread_names.each do |thread_name|
|
||||||
|
assert_consistent_timeline(timeline_for(thread_name, full_timeline))
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def test_thread_instrumentation_unregister
|
def test_thread_instrumentation_unregister
|
||||||
Bug::ThreadInstrumentation::unregister_callback
|
|
||||||
assert Bug::ThreadInstrumentation::register_and_unregister_callbacks
|
assert Bug::ThreadInstrumentation::register_and_unregister_callbacks
|
||||||
end
|
end
|
||||||
|
|
||||||
def test_thread_instrumentation_event_data
|
private
|
||||||
assert_nil Bug::ThreadInstrumentation.last_spawned_thread
|
|
||||||
thr = Thread.new{ }.join
|
def record
|
||||||
assert_same thr, Bug::ThreadInstrumentation.last_spawned_thread
|
Bug::ThreadInstrumentation.register_callback
|
||||||
|
yield
|
||||||
|
ensure
|
||||||
|
timeline = Bug::ThreadInstrumentation.unregister_callback
|
||||||
|
if $!
|
||||||
|
raise
|
||||||
|
else
|
||||||
|
return timeline
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
def assert_consistent_timeline(events)
|
||||||
|
refute_predicate events, :empty?
|
||||||
|
|
||||||
|
previous_event = nil
|
||||||
|
events.each do |event|
|
||||||
|
refute_equal :exited, previous_event, "`exited` must be the final event: #{events.inspect}"
|
||||||
|
case event
|
||||||
|
when :started
|
||||||
|
assert_nil previous_event, "`started` must be the first event: #{events.inspect}"
|
||||||
|
when :ready
|
||||||
|
unless previous_event.nil?
|
||||||
|
assert %i(started suspended).include?(previous_event), "`ready` must be preceded by `started` or `suspended`: #{events.inspect}"
|
||||||
|
end
|
||||||
|
when :resumed
|
||||||
|
unless previous_event.nil?
|
||||||
|
assert_equal :ready, previous_event, "`resumed` must be preceded by `ready`: #{events.inspect}"
|
||||||
|
end
|
||||||
|
when :suspended
|
||||||
|
unless previous_event.nil?
|
||||||
|
assert_equal :resumed, previous_event, "`suspended` must be preceded by `resumed`: #{events.inspect}"
|
||||||
|
end
|
||||||
|
when :exited
|
||||||
|
unless previous_event.nil?
|
||||||
|
assert %i(resumed suspended).include?(previous_event), "`exited` must be preceded by `resumed` or `suspended`: #{events.inspect}"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
previous_event = event
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def timeline_for(thread, timeline)
|
||||||
|
timeline.select { |t, _| t == thread }.map(&:last)
|
||||||
|
end
|
||||||
|
|
||||||
def fib(n = 20)
|
def fib(n = 20)
|
||||||
return n if n <= 1
|
return n if n <= 1
|
||||||
|
@ -86,13 +211,13 @@ class TestThreadInstrumentation < Test::Unit::TestCase
|
||||||
THREADS_COUNT.times.map { Thread.new { fib(size) } }.each(&:join)
|
THREADS_COUNT.times.map { Thread.new { fib(size) } }.each(&:join)
|
||||||
end
|
end
|
||||||
|
|
||||||
def assert_join_counters(counters)
|
def cleanup_threads
|
||||||
counters.each_with_index do |c, i|
|
Thread.list.each do |thread|
|
||||||
assert_operator c, :>, 0, "Call counters[#{i}]: #{counters.inspect}"
|
if thread != Thread.current
|
||||||
|
thread.kill
|
||||||
|
thread.join rescue nil
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
assert_equal [Thread.current], Thread.list
|
||||||
|
|
||||||
def assert_global_join_counters(counters)
|
|
||||||
assert_equal THREADS_COUNT, counters.first
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
4
thread.c
4
thread.c
|
@ -5406,10 +5406,6 @@ Init_Thread(void)
|
||||||
// it assumes blocked by thread_sched_to_waiting().
|
// it assumes blocked by thread_sched_to_waiting().
|
||||||
// thread_sched_to_running(sched, th);
|
// thread_sched_to_running(sched, th);
|
||||||
|
|
||||||
#ifdef RB_INTERNAL_THREAD_HOOK
|
|
||||||
RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_RESUMED, th);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
th->pending_interrupt_queue = rb_ary_hidden_new(0);
|
th->pending_interrupt_queue = rb_ary_hidden_new(0);
|
||||||
th->pending_interrupt_queue_checked = 0;
|
th->pending_interrupt_queue_checked = 0;
|
||||||
th->pending_interrupt_mask_stack = rb_ary_hidden_new(0);
|
th->pending_interrupt_mask_stack = rb_ary_hidden_new(0);
|
||||||
|
|
|
@ -266,7 +266,34 @@ rb_native_cond_timedwait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex, u
|
||||||
|
|
||||||
static rb_internal_thread_event_hook_t *rb_internal_thread_event_hooks = NULL;
|
static rb_internal_thread_event_hook_t *rb_internal_thread_event_hooks = NULL;
|
||||||
static void rb_thread_execute_hooks(rb_event_flag_t event, rb_thread_t *th);
|
static void rb_thread_execute_hooks(rb_event_flag_t event, rb_thread_t *th);
|
||||||
#define RB_INTERNAL_THREAD_HOOK(event, th) if (rb_internal_thread_event_hooks) { rb_thread_execute_hooks(event, th); }
|
|
||||||
|
#if 0
|
||||||
|
static const char *
|
||||||
|
event_name(rb_event_flag_t event)
|
||||||
|
{
|
||||||
|
switch (event) {
|
||||||
|
case RUBY_INTERNAL_THREAD_EVENT_STARTED:
|
||||||
|
return "STARTED";
|
||||||
|
case RUBY_INTERNAL_THREAD_EVENT_READY:
|
||||||
|
return "READY";
|
||||||
|
case RUBY_INTERNAL_THREAD_EVENT_RESUMED:
|
||||||
|
return "RESUMED";
|
||||||
|
case RUBY_INTERNAL_THREAD_EVENT_SUSPENDED:
|
||||||
|
return "SUSPENDED";
|
||||||
|
case RUBY_INTERNAL_THREAD_EVENT_EXITED:
|
||||||
|
return "EXITED";
|
||||||
|
}
|
||||||
|
return "no-event";
|
||||||
|
}
|
||||||
|
|
||||||
|
#define RB_INTERNAL_THREAD_HOOK(event, th) \
|
||||||
|
if (UNLIKELY(rb_internal_thread_event_hooks)) { \
|
||||||
|
fprintf(stderr, "[thread=%"PRIxVALUE"] %s in %s (%s:%d)\n", th->self, event_name(event), __func__, __FILE__, __LINE__); \
|
||||||
|
rb_thread_execute_hooks(event, th); \
|
||||||
|
}
|
||||||
|
#else
|
||||||
|
#define RB_INTERNAL_THREAD_HOOK(event, th) if (UNLIKELY(rb_internal_thread_event_hooks)) { rb_thread_execute_hooks(event, th); }
|
||||||
|
#endif
|
||||||
|
|
||||||
static rb_serial_t current_fork_gen = 1; /* We can't use GET_VM()->fork_gen */
|
static rb_serial_t current_fork_gen = 1; /* We can't use GET_VM()->fork_gen */
|
||||||
|
|
||||||
|
@ -958,9 +985,7 @@ thread_sched_wakeup_next_thread(struct rb_thread_sched *sched, rb_thread_t *th,
|
||||||
static void
|
static void
|
||||||
thread_sched_to_waiting_common0(struct rb_thread_sched *sched, rb_thread_t *th, bool to_dead)
|
thread_sched_to_waiting_common0(struct rb_thread_sched *sched, rb_thread_t *th, bool to_dead)
|
||||||
{
|
{
|
||||||
if (rb_internal_thread_event_hooks) {
|
RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th);
|
||||||
rb_thread_execute_hooks(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!to_dead) native_thread_dedicated_inc(th->vm, th->ractor, th->nt);
|
if (!to_dead) native_thread_dedicated_inc(th->vm, th->ractor, th->nt);
|
||||||
|
|
||||||
|
@ -975,9 +1000,8 @@ static void
|
||||||
thread_sched_to_dead_common(struct rb_thread_sched *sched, rb_thread_t *th)
|
thread_sched_to_dead_common(struct rb_thread_sched *sched, rb_thread_t *th)
|
||||||
{
|
{
|
||||||
RUBY_DEBUG_LOG("dedicated:%d", th->nt->dedicated);
|
RUBY_DEBUG_LOG("dedicated:%d", th->nt->dedicated);
|
||||||
RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_EXITED, th);
|
|
||||||
|
|
||||||
thread_sched_to_waiting_common0(sched, th, true);
|
thread_sched_to_waiting_common0(sched, th, true);
|
||||||
|
RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_EXITED, th);
|
||||||
}
|
}
|
||||||
|
|
||||||
// running -> dead
|
// running -> dead
|
||||||
|
@ -1007,8 +1031,6 @@ thread_sched_to_waiting_common(struct rb_thread_sched *sched, rb_thread_t *th)
|
||||||
static void
|
static void
|
||||||
thread_sched_to_waiting(struct rb_thread_sched *sched, rb_thread_t *th)
|
thread_sched_to_waiting(struct rb_thread_sched *sched, rb_thread_t *th)
|
||||||
{
|
{
|
||||||
RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th);
|
|
||||||
|
|
||||||
thread_sched_lock(sched, th);
|
thread_sched_lock(sched, th);
|
||||||
{
|
{
|
||||||
thread_sched_to_waiting_common(sched, th);
|
thread_sched_to_waiting_common(sched, th);
|
||||||
|
@ -1046,6 +1068,7 @@ ubf_waiting(void *ptr)
|
||||||
// not sleeping yet.
|
// not sleeping yet.
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th);
|
||||||
thread_sched_to_ready_common(sched, th, true, false);
|
thread_sched_to_ready_common(sched, th, true, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1089,6 +1112,7 @@ thread_sched_yield(struct rb_thread_sched *sched, rb_thread_t *th)
|
||||||
thread_sched_lock(sched, th);
|
thread_sched_lock(sched, th);
|
||||||
{
|
{
|
||||||
if (!ccan_list_empty(&sched->readyq)) {
|
if (!ccan_list_empty(&sched->readyq)) {
|
||||||
|
RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th);
|
||||||
thread_sched_wakeup_next_thread(sched, th, !th_has_dedicated_nt(th));
|
thread_sched_wakeup_next_thread(sched, th, !th_has_dedicated_nt(th));
|
||||||
bool can_direct_transfer = !th_has_dedicated_nt(th);
|
bool can_direct_transfer = !th_has_dedicated_nt(th);
|
||||||
thread_sched_to_ready_common(sched, th, false, can_direct_transfer);
|
thread_sched_to_ready_common(sched, th, false, can_direct_transfer);
|
||||||
|
@ -2148,8 +2172,6 @@ native_thread_create_dedicated(rb_thread_t *th)
|
||||||
static void
|
static void
|
||||||
call_thread_start_func_2(rb_thread_t *th)
|
call_thread_start_func_2(rb_thread_t *th)
|
||||||
{
|
{
|
||||||
RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_STARTED, th);
|
|
||||||
|
|
||||||
#if defined USE_NATIVE_THREAD_INIT
|
#if defined USE_NATIVE_THREAD_INIT
|
||||||
native_thread_init_stack(th);
|
native_thread_init_stack(th);
|
||||||
thread_start_func_2(th, th->ec->machine.stack_start);
|
thread_start_func_2(th, th->ec->machine.stack_start);
|
||||||
|
@ -2309,6 +2331,7 @@ native_thread_create(rb_thread_t *th)
|
||||||
{
|
{
|
||||||
VM_ASSERT(th->nt == 0);
|
VM_ASSERT(th->nt == 0);
|
||||||
RUBY_DEBUG_LOG("th:%d has_dnt:%d", th->serial, th->has_dedicated_nt);
|
RUBY_DEBUG_LOG("th:%d has_dnt:%d", th->serial, th->has_dedicated_nt);
|
||||||
|
RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_STARTED, th);
|
||||||
|
|
||||||
if (!th->ractor->threads.sched.enable_mn_threads) {
|
if (!th->ractor->threads.sched.enable_mn_threads) {
|
||||||
th->has_dedicated_nt = 1;
|
th->has_dedicated_nt = 1;
|
||||||
|
@ -3232,7 +3255,6 @@ static void
|
||||||
native_sleep(rb_thread_t *th, rb_hrtime_t *rel)
|
native_sleep(rb_thread_t *th, rb_hrtime_t *rel)
|
||||||
{
|
{
|
||||||
struct rb_thread_sched *sched = TH_SCHED(th);
|
struct rb_thread_sched *sched = TH_SCHED(th);
|
||||||
RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th);
|
|
||||||
|
|
||||||
RUBY_DEBUG_LOG("rel:%d", rel ? (int)*rel : 0);
|
RUBY_DEBUG_LOG("rel:%d", rel ? (int)*rel : 0);
|
||||||
if (rel) {
|
if (rel) {
|
||||||
|
@ -3248,7 +3270,6 @@ native_sleep(rb_thread_t *th, rb_hrtime_t *rel)
|
||||||
}
|
}
|
||||||
|
|
||||||
RUBY_DEBUG_LOG("wakeup");
|
RUBY_DEBUG_LOG("wakeup");
|
||||||
RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_READY, th);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// thread internal event hooks (only for pthread)
|
// thread internal event hooks (only for pthread)
|
||||||
|
|
Загрузка…
Ссылка в новой задаче