Further fix the GVL instrumentation API

Followup: https://github.com/ruby/ruby/pull/9029

[Bug #20019]

Some events still weren't triggered from the right place.

The test suite was also improved a bit more.
This commit is contained in:
Jean Boussier 2023-11-28 11:03:13 +01:00 коммит произвёл Jean Boussier
Родитель 7bd172744f
Коммит 982641939c
3 изменённых файлов: 114 добавлений и 53 удалений

Просмотреть файл

@ -73,53 +73,51 @@ event_name(rb_event_flag_t event)
return "no-event";
}
NORETURN(static void unexpected(const char *format, const char *event_name, VALUE thread));
static void
unexpected(const char *format, const char *event_name, VALUE thread)
unexpected(bool strict, const char *format, VALUE thread, rb_event_flag_t last_event)
{
#if 0
fprintf(stderr, "----------------\n");
fprintf(stderr, format, event_name, thread);
fprintf(stderr, "\n");
rb_backtrace();
fprintf(stderr, "----------------\n");
#else
rb_bug(format, event_name, thread);
#endif
const char *last_event_name = event_name(last_event);
if (strict) {
rb_bug(format, thread, last_event_name);
}
else {
fprintf(stderr, format, thread, last_event_name);
fprintf(stderr, "\n");
}
}
static void
ex_callback(rb_event_flag_t event, const rb_internal_thread_event_data_t *event_data, void *user_data)
{
rb_event_flag_t last_event = find_last_event(event_data->thread);
bool strict = (bool)user_data;
switch (event) {
case RUBY_INTERNAL_THREAD_EVENT_STARTED:
if (last_event != 0) {
unexpected("TestThreadInstrumentation: `started` event can't be preceded by `%s` (thread=%"PRIxVALUE")", event_name(last_event), event_data->thread);
if (last_event != 0) {
switch (event) {
case RUBY_INTERNAL_THREAD_EVENT_STARTED:
unexpected(strict, "[thread=%"PRIxVALUE"] `started` event can't be preceded by `%s`", event_data->thread, last_event);
break;
case RUBY_INTERNAL_THREAD_EVENT_READY:
if (last_event != RUBY_INTERNAL_THREAD_EVENT_STARTED && last_event != RUBY_INTERNAL_THREAD_EVENT_SUSPENDED) {
unexpected(strict, "[thread=%"PRIxVALUE"] `ready` must be preceded by `started` or `suspended`, got: `%s`", event_data->thread, last_event);
}
break;
case RUBY_INTERNAL_THREAD_EVENT_RESUMED:
if (last_event != RUBY_INTERNAL_THREAD_EVENT_READY) {
unexpected(strict, "[thread=%"PRIxVALUE"] `resumed` must be preceded by `ready`, got: `%s`", event_data->thread, last_event);
}
break;
case RUBY_INTERNAL_THREAD_EVENT_SUSPENDED:
if (last_event != RUBY_INTERNAL_THREAD_EVENT_RESUMED) {
unexpected(strict, "[thread=%"PRIxVALUE"] `suspended` must be preceded by `resumed`, got: `%s`", event_data->thread, last_event);
}
break;
case RUBY_INTERNAL_THREAD_EVENT_EXITED:
if (last_event != RUBY_INTERNAL_THREAD_EVENT_RESUMED && last_event != RUBY_INTERNAL_THREAD_EVENT_SUSPENDED) {
unexpected(strict, "[thread=%"PRIxVALUE"] `exited` must be preceded by `resumed` or `suspended`, got: `%s`", event_data->thread, last_event);
}
break;
}
break;
case RUBY_INTERNAL_THREAD_EVENT_READY:
if (last_event != 0 && last_event != RUBY_INTERNAL_THREAD_EVENT_STARTED && last_event != RUBY_INTERNAL_THREAD_EVENT_SUSPENDED) {
unexpected("TestThreadInstrumentation: `ready` must be preceded by `started` or `suspended`, got: `%s` (thread=%"PRIxVALUE")", event_name(last_event), event_data->thread);
}
break;
case RUBY_INTERNAL_THREAD_EVENT_RESUMED:
if (last_event != 0 && last_event != RUBY_INTERNAL_THREAD_EVENT_READY) {
unexpected("TestThreadInstrumentation: `resumed` must be preceded by `ready`, got: `%s` (thread=%"PRIxVALUE")", event_name(last_event), event_data->thread);
}
break;
case RUBY_INTERNAL_THREAD_EVENT_SUSPENDED:
if (last_event != 0 && last_event != RUBY_INTERNAL_THREAD_EVENT_RESUMED) {
unexpected("TestThreadInstrumentation: `suspended` must be preceded by `resumed`, got: `%s` (thread=%"PRIxVALUE")", event_name(last_event), event_data->thread);
}
break;
case RUBY_INTERNAL_THREAD_EVENT_EXITED:
if (last_event != 0 && last_event != RUBY_INTERNAL_THREAD_EVENT_RESUMED && last_event != RUBY_INTERNAL_THREAD_EVENT_SUSPENDED) {
unexpected("TestThreadInstrumentation: `exited` must be preceded by `resumed` or `suspended`, got: `%s` (thread=%"PRIxVALUE")", event_name(last_event), event_data->thread);
}
break;
}
rb_atomic_t cursor = RUBY_ATOMIC_FETCH_ADD(timeline_cursor, 1);
@ -134,7 +132,7 @@ ex_callback(rb_event_flag_t event, const rb_internal_thread_event_data_t *event_
static rb_internal_thread_event_hook_t * single_hook = NULL;
static VALUE
thread_register_callback(VALUE thread)
thread_register_callback(VALUE thread, VALUE strict)
{
single_hook = rb_internal_thread_add_event_hook(
ex_callback,
@ -143,7 +141,7 @@ thread_register_callback(VALUE thread)
RUBY_INTERNAL_THREAD_EVENT_RESUMED |
RUBY_INTERNAL_THREAD_EVENT_SUSPENDED |
RUBY_INTERNAL_THREAD_EVENT_EXITED,
NULL
(void *)RTEST(strict)
);
return Qnil;
@ -216,7 +214,7 @@ Init_instrumentation(void)
timeline_value = TypedData_Wrap_Struct(0, &event_timeline_type, 0);
rb_global_variable(&last_thread);
rb_define_singleton_method(klass, "register_callback", thread_register_callback, 0);
rb_define_singleton_method(klass, "register_callback", thread_register_callback, 1);
rb_define_singleton_method(klass, "unregister_callback", thread_unregister_callback, 0);
rb_define_singleton_method(klass, "register_and_unregister_callbacks", thread_register_and_unregister_callback, 0);
}

Просмотреть файл

@ -29,18 +29,44 @@ class TestThreadInstrumentation < Test::Unit::TestCase
thread&.kill
end
def test_thread_pass_single_thread
full_timeline = record do
Thread.pass
end
assert_equal [], timeline_for(Thread.current, full_timeline)
end
def test_thread_pass_multi_thread
thread = Thread.new do
cpu_bound_work(0.5)
end
full_timeline = record do
Thread.pass
end
assert_equal %i(suspended ready resumed), timeline_for(Thread.current, full_timeline)
ensure
thread&.kill
thread&.join
end
def test_muti_thread_timeline
threads = nil
full_timeline = record do
threads = threaded_cpu_work
fib(20)
threads = threaded_cpu_bound_work(1.0)
expected = cpu_bound_work(1.0)
results = threads.map(&:value)
results.each do |r|
refute_equal false, r
end
assert_equal [false] * THREADS_COUNT, threads.map(&:status)
end
threads.each do |thread|
timeline = timeline_for(thread, full_timeline)
assert_consistent_timeline(timeline)
assert timeline.count(:suspended) > 1, "Expected threads to yield suspended at least once: #{timeline.inspect}"
end
timeline = timeline_for(Thread.current, full_timeline)
@ -105,7 +131,7 @@ class TestThreadInstrumentation < Test::Unit::TestCase
assert_equal %i(started ready resumed suspended ready resumed suspended exited), timeline
end
def test_thread_blocked_forever
def test_thread_blocked_forever_on_mutex
mutex = Mutex.new
mutex.lock
thread = nil
@ -123,7 +149,30 @@ class TestThreadInstrumentation < Test::Unit::TestCase
timeline = timeline_for(thread, full_timeline)
assert_consistent_timeline(timeline)
assert_equal %i(started ready resumed), timeline
assert_equal %i(started ready resumed suspended), timeline
end
def test_thread_blocked_temporarily_on_mutex
mutex = Mutex.new
mutex.lock
thread = nil
full_timeline = record do
thread = Thread.new do
mutex.lock
end
10.times { Thread.pass }
sleep 0.1
mutex.unlock
10.times { Thread.pass }
sleep 0.1
end
thread.join
timeline = timeline_for(thread, full_timeline)
assert_consistent_timeline(timeline)
assert_equal %i(started ready resumed suspended ready resumed suspended exited), timeline
end
def test_thread_instrumentation_fork_safe
@ -135,7 +184,7 @@ class TestThreadInstrumentation < Test::Unit::TestCase
thread_statuses = Marshal.load(read_pipe)
full_timeline = Marshal.load(read_pipe)
else
threads = threaded_cpu_work
threads = threaded_cpu_bound_work.each(&:join)
Marshal.dump(threads.map(&:status), STDOUT)
full_timeline = Bug::ThreadInstrumentation.unregister_callback.map { |t, e| [t.to_s, e ] }
Marshal.dump(full_timeline, STDOUT)
@ -157,7 +206,7 @@ class TestThreadInstrumentation < Test::Unit::TestCase
private
def record
Bug::ThreadInstrumentation.register_callback
Bug::ThreadInstrumentation.register_callback(!ENV["GVL_DEBUG"])
yield
ensure
timeline = Bug::ThreadInstrumentation.unregister_callback
@ -202,13 +251,27 @@ class TestThreadInstrumentation < Test::Unit::TestCase
timeline.select { |t, _| t == thread }.map(&:last)
end
def fib(n = 20)
def fib(n = 30)
return n if n <= 1
fib(n-1) + fib(n-2)
end
def threaded_cpu_work(size = 20)
THREADS_COUNT.times.map { Thread.new { fib(size) } }.each(&:join)
def cpu_bound_work(duration)
deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + duration
i = 0
while deadline > Process.clock_gettime(Process::CLOCK_MONOTONIC)
fib(25)
i += 1
end
i > 0 ? i : false
end
def threaded_cpu_bound_work(duration = 0.5)
THREADS_COUNT.times.map do
Thread.new do
cpu_bound_work(duration)
end
end
end
def cleanup_threads

Просмотреть файл

@ -799,6 +799,7 @@ thread_sched_to_ready_common(struct rb_thread_sched *sched, rb_thread_t *th, boo
VM_ASSERT(sched->running != th);
VM_ASSERT(!thread_sched_readyq_contain_p(sched, th));
RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_READY, th);
if (sched->running == NULL) {
thread_sched_set_running(sched, th);
@ -807,8 +808,6 @@ thread_sched_to_ready_common(struct rb_thread_sched *sched, rb_thread_t *th, boo
else {
thread_sched_enq(sched, th);
}
RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_READY, th);
}
// waiting -> ready
@ -1068,7 +1067,6 @@ ubf_waiting(void *ptr)
// not sleeping yet.
}
else {
RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th);
thread_sched_to_ready_common(sched, th, true, false);
}
}
@ -1086,6 +1084,8 @@ thread_sched_to_waiting_until_wakeup(struct rb_thread_sched *sched, rb_thread_t
RB_VM_SAVE_MACHINE_CONTEXT(th);
setup_ubf(th, ubf_waiting, (void *)th);
RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th);
thread_sched_lock(sched, th);
{
if (!RUBY_VM_INTERRUPTED(th->ec)) {