From 7f175e564875b011efb43537907867dd08d659e8 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Thu, 20 Oct 2022 13:38:52 +1300 Subject: [PATCH] Avoid missed wakeup with fiber scheduler and Fiber.blocking. (#6588) * Ensure that blocked fibers don't prevent valid wakeups. --- test/fiber/scheduler.rb | 11 ++++++++++ test/fiber/test_scheduler.rb | 42 ++++++++++++++++++++++++++++++++++++ thread.c | 4 ++-- thread_sync.c | 25 ++++++++++++++------- 4 files changed, 72 insertions(+), 10 deletions(-) diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb index 3fd41ef6f1..204a297133 100644 --- a/test/fiber/scheduler.rb +++ b/test/fiber/scheduler.rb @@ -350,3 +350,14 @@ class SleepingUnblockScheduler < Scheduler sleep(0.1) end end + +class SleepingBlockingScheduler < Scheduler + def kernel_sleep(duration = nil) + # Deliberaly sleep in a blocking state which can trigger a deadlock if the implementation is not correct. + Fiber.blocking{sleep 0.0001} + + self.block(:sleep, duration) + + return true + end +end diff --git a/test/fiber/test_scheduler.rb b/test/fiber/test_scheduler.rb index 5a24bff04f..300d30ad63 100644 --- a/test/fiber/test_scheduler.rb +++ b/test/fiber/test_scheduler.rb @@ -138,4 +138,46 @@ class TestFiberScheduler < Test::Unit::TestCase Object.send(:remove_const, :TestFiberSchedulerAutoload) end end + + def test_deadlock + mutex = Thread::Mutex.new + condition = Thread::ConditionVariable.new + q = 0.0001 + + signaller = Thread.new do + loop do + mutex.synchronize do + condition.signal + end + sleep q + end + end + + i = 0 + + thread = Thread.new do + scheduler = SleepingBlockingScheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + 10.times do + mutex.synchronize do + condition.wait(mutex) + sleep q + i += 1 + end + end + end + end + + # Wait for 10 seconds at most... if it doesn't finish, it's deadlocked. + thread.join(10) + + # If it's deadlocked, it will never finish, so this will be 0. + assert_equal 10, i + ensure + # Make sure the threads are dead... + thread.kill + signaller.kill + end end diff --git a/thread.c b/thread.c index e1b194861a..d8925e618e 100644 --- a/thread.c +++ b/thread.c @@ -404,7 +404,7 @@ rb_threadptr_join_list_wakeup(rb_thread_t *thread) rb_thread_t *target_thread = join_list->thread; - if (target_thread->scheduler != Qnil && rb_fiberptr_blocking(join_list->fiber) == 0) { + if (target_thread->scheduler != Qnil && join_list->fiber) { rb_fiber_scheduler_unblock(target_thread->scheduler, target_thread->self, rb_fiberptr_self(join_list->fiber)); } else { @@ -1091,7 +1091,7 @@ thread_join(rb_thread_t *target_th, VALUE timeout, rb_hrtime_t *limit) struct rb_waiting_list waiter; waiter.next = target_th->join_list; waiter.thread = th; - waiter.fiber = fiber; + waiter.fiber = rb_fiberptr_blocking(fiber) ? NULL : fiber; target_th->join_list = &waiter; struct join_arg arg; diff --git a/thread_sync.c b/thread_sync.c index 3888534468..2bcf59137e 100644 --- a/thread_sync.c +++ b/thread_sync.c @@ -20,6 +20,16 @@ struct sync_waiter { struct ccan_list_node node; }; +static inline rb_fiber_t* +nonblocking_fiber(rb_fiber_t *fiber) +{ + if (rb_fiberptr_blocking(fiber)) { + return NULL; + } + + return fiber; +} + struct queue_sleep_arg { VALUE self; VALUE timeout; @@ -37,8 +47,7 @@ sync_wakeup(struct ccan_list_head *head, long max) ccan_list_del_init(&cur->node); if (cur->th->status != THREAD_KILLED) { - - if (cur->th->scheduler != Qnil && rb_fiberptr_blocking(cur->fiber) == 0) { + if (cur->th->scheduler != Qnil && cur->fiber) { rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber)); } else { @@ -306,7 +315,7 @@ do_mutex_lock(VALUE self, int interruptible_p) struct sync_waiter sync_waiter = { .self = self, .th = th, - .fiber = fiber + .fiber = nonblocking_fiber(fiber) }; ccan_list_add_tail(&mutex->waitq, &sync_waiter.node); @@ -339,7 +348,7 @@ do_mutex_lock(VALUE self, int interruptible_p) struct sync_waiter sync_waiter = { .self = self, .th = th, - .fiber = fiber + .fiber = nonblocking_fiber(fiber) }; ccan_list_add_tail(&mutex->waitq, &sync_waiter.node); @@ -437,7 +446,7 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber) ccan_list_for_each_safe(&mutex->waitq, cur, next, node) { ccan_list_del_init(&cur->node); - if (cur->th->scheduler != Qnil && rb_fiberptr_blocking(cur->fiber) == 0) { + if (cur->th->scheduler != Qnil && cur->fiber) { rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber)); goto found; } @@ -1051,7 +1060,7 @@ queue_do_pop(VALUE self, struct rb_queue *q, int should_block, VALUE timeout) assert(queue_closed_p(self) == 0); struct queue_waiter queue_waiter = { - .w = {.self = self, .th = ec->thread_ptr, .fiber = ec->fiber_ptr}, + .w = {.self = self, .th = ec->thread_ptr, .fiber = nonblocking_fiber(ec->fiber_ptr)}, .as = {.q = q} }; @@ -1258,7 +1267,7 @@ rb_szqueue_push(rb_execution_context_t *ec, VALUE self, VALUE object, VALUE non_ else { rb_execution_context_t *ec = GET_EC(); struct queue_waiter queue_waiter = { - .w = {.self = self, .th = ec->thread_ptr, .fiber = ec->fiber_ptr}, + .w = {.self = self, .th = ec->thread_ptr, .fiber = nonblocking_fiber(ec->fiber_ptr)}, .as = {.sq = sq} }; @@ -1491,7 +1500,7 @@ rb_condvar_wait(int argc, VALUE *argv, VALUE self) struct sync_waiter sync_waiter = { .self = args.mutex, .th = ec->thread_ptr, - .fiber = ec->fiber_ptr + .fiber = nonblocking_fiber(ec->fiber_ptr) }; ccan_list_add_tail(&cv->waitq, &sync_waiter.node);