diff --git a/.document b/.document index 5494bcc7fe..ec2fa09326 100644 --- a/.document +++ b/.document @@ -24,6 +24,7 @@ pack.rb ractor.rb string.rb timev.rb +thread_sync.rb trace_point.rb warning.rb diff --git a/common.mk b/common.mk index aeb87dfb55..4c49690e4a 100644 --- a/common.mk +++ b/common.mk @@ -1062,6 +1062,7 @@ BUILTIN_RB_SRCS = \ $(srcdir)/kernel.rb \ $(srcdir)/ractor.rb \ $(srcdir)/timev.rb \ + $(srcdir)/thread_sync.rb \ $(srcdir)/nilclass.rb \ $(srcdir)/prelude.rb \ $(srcdir)/gem_prelude.rb \ @@ -9447,6 +9448,7 @@ miniinit.$(OBJEXT): {$(VPATH)}st.h miniinit.$(OBJEXT): {$(VPATH)}subst.h miniinit.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).h miniinit.$(OBJEXT): {$(VPATH)}thread_native.h +miniinit.$(OBJEXT): {$(VPATH)}thread_sync.rb miniinit.$(OBJEXT): {$(VPATH)}timev.rb miniinit.$(OBJEXT): {$(VPATH)}trace_point.rb miniinit.$(OBJEXT): {$(VPATH)}vm_core.h @@ -15230,6 +15232,7 @@ thread.$(OBJEXT): {$(VPATH)}backward/2/limits.h thread.$(OBJEXT): {$(VPATH)}backward/2/long_long.h thread.$(OBJEXT): {$(VPATH)}backward/2/stdalign.h thread.$(OBJEXT): {$(VPATH)}backward/2/stdarg.h +thread.$(OBJEXT): {$(VPATH)}builtin.h thread.$(OBJEXT): {$(VPATH)}config.h thread.$(OBJEXT): {$(VPATH)}debug.h thread.$(OBJEXT): {$(VPATH)}debug_counter.h @@ -15412,6 +15415,8 @@ thread.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).c thread.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).h thread.$(OBJEXT): {$(VPATH)}thread_native.h thread.$(OBJEXT): {$(VPATH)}thread_sync.c +thread.$(OBJEXT): {$(VPATH)}thread_sync.rb +thread.$(OBJEXT): {$(VPATH)}thread_sync.rbinc thread.$(OBJEXT): {$(VPATH)}timev.h thread.$(OBJEXT): {$(VPATH)}vm_core.h thread.$(OBJEXT): {$(VPATH)}vm_debug.h diff --git a/hrtime.h b/hrtime.h index 4ac3d54723..80aff5deb3 100644 --- a/hrtime.h +++ b/hrtime.h @@ -36,6 +36,7 @@ #define RB_HRTIME_PER_MSEC (RB_HRTIME_PER_USEC * (rb_hrtime_t)1000) #define RB_HRTIME_PER_SEC (RB_HRTIME_PER_MSEC * (rb_hrtime_t)1000) #define RB_HRTIME_MAX UINT64_MAX +#define RB_HRTIME_MIN ((rb_hrtime_t)0) /* * Lets try to support time travelers. Lets assume anybody with a time machine @@ -91,6 +92,15 @@ rb_hrtime_add(rb_hrtime_t a, rb_hrtime_t b) return c; } +static inline rb_hrtime_t +rb_hrtime_sub(rb_hrtime_t a, rb_hrtime_t b) +{ + if (a < b) { + return RB_HRTIME_MIN; + } + return a - b; +} + /* * convert a timeval struct to rb_hrtime_t, clamping at RB_HRTIME_MAX */ diff --git a/inits.c b/inits.c index f41e88d838..22ba6d5a8c 100644 --- a/inits.c +++ b/inits.c @@ -98,6 +98,7 @@ rb_call_builtin_inits(void) BUILTIN(array); BUILTIN(kernel); BUILTIN(timev); + BUILTIN(thread_sync); BUILTIN(yjit); BUILTIN(nilclass); BUILTIN(marshal); diff --git a/spec/ruby/shared/queue/deque.rb b/spec/ruby/shared/queue/deque.rb index 8b755dd9b7..ed32bd29c8 100644 --- a/spec/ruby/shared/queue/deque.rb +++ b/spec/ruby/shared/queue/deque.rb @@ -55,6 +55,61 @@ describe :queue_deq, shared: true do t.join end + describe "with a timeout" do + ruby_version_is "3.2" do + it "returns an item if one is available in time" do + q = @object.call + + t = Thread.new { + q.send(@method, timeout: 1).should == 1 + } + Thread.pass until t.status == "sleep" && q.num_waiting == 1 + q << 1 + t.join + end + + it "returns nil if no item is available in time" do + q = @object.call + + t = Thread.new { + q.send(@method, timeout: 0.1).should == nil + } + t.join + end + + it "does nothing if the timeout is nil" do + q = @object.call + t = Thread.new { + q.send(@method, timeout: nil).should == 1 + } + t.join(0.2).should == nil + q << 1 + t.join + end + + it "raise TypeError if timeout is not a valid numeric" do + q = @object.call + -> { q.send(@method, timeout: "1") }.should raise_error( + TypeError, + "no implicit conversion to float from string", + ) + + -> { q.send(@method, timeout: false) }.should raise_error( + TypeError, + "no implicit conversion to float from false", + ) + end + + it "raise ArgumentError if non_block = true is passed too" do + q = @object.call + -> { q.send(@method, true, timeout: 1) }.should raise_error( + ArgumentError, + "can't set a timeout if non_block is enabled", + ) + end + end + end + describe "in non-blocking mode" do it "removes an item from the queue" do q = @object.call diff --git a/test/ruby/test_settracefunc.rb b/test/ruby/test_settracefunc.rb index 56d457c7d7..31946c8b71 100644 --- a/test/ruby/test_settracefunc.rb +++ b/test/ruby/test_settracefunc.rb @@ -2140,17 +2140,16 @@ CODE m2t_q.push 1 t.join - assert_equal ["c-return", base_line + 31], events[0] - assert_equal ["line", base_line + 32], events[1] - assert_equal ["line", base_line + 33], events[2] - assert_equal ["call", base_line + -6], events[3] - assert_equal ["return", base_line + -4], events[4] - assert_equal ["line", base_line + 34], events[5] - assert_equal ["line", base_line + 35], events[6] - assert_equal ["c-call", base_line + 35], events[7] # Thread.current - assert_equal ["c-return", base_line + 35], events[8] # Thread.current - assert_equal ["c-call", base_line + 35], events[9] # Thread#set_trace_func - assert_equal nil, events[10] + assert_equal ["line", base_line + 32], events[0] + assert_equal ["line", base_line + 33], events[1] + assert_equal ["call", base_line + -6], events[2] + assert_equal ["return", base_line + -4], events[3] + assert_equal ["line", base_line + 34], events[4] + assert_equal ["line", base_line + 35], events[5] + assert_equal ["c-call", base_line + 35], events[6] # Thread.current + assert_equal ["c-return", base_line + 35], events[7] # Thread.current + assert_equal ["c-call", base_line + 35], events[8] # Thread#set_trace_func + assert_equal nil, events[9] end def test_lineno_in_optimized_insn diff --git a/test/ruby/test_thread_queue.rb b/test/ruby/test_thread_queue.rb index ebf7ded3b9..aa4ea0a400 100644 --- a/test/ruby/test_thread_queue.rb +++ b/test/ruby/test_thread_queue.rb @@ -111,6 +111,23 @@ class TestThreadQueue < Test::Unit::TestCase assert_equal(0, q.num_waiting) end + def test_queue_pop_timeout + q = Thread::Queue.new + q << 1 + assert_equal 1, q.pop(timeout: 1) + + t1 = Thread.new { q.pop(timeout: 1) } + assert_equal t1, t1.join(2) + assert_nil t1.value + + t2 = Thread.new { q.pop(timeout: 0.1) } + assert_equal t2, t2.join(0.2) + assert_nil t2.value + ensure + t1&.kill + t2&.kill + end + def test_queue_pop_non_block q = Thread::Queue.new assert_raise_with_message(ThreadError, /empty/) do @@ -126,6 +143,24 @@ class TestThreadQueue < Test::Unit::TestCase assert_equal(0, q.num_waiting) end + def test_sized_queue_pop_timeout + q = Thread::SizedQueue.new(1) + + q << 1 + assert_equal 1, q.pop(timeout: 1) + + t1 = Thread.new { q.pop(timeout: 1) } + assert_equal t1, t1.join(2) + assert_nil t1.value + + t2 = Thread.new { q.pop(timeout: 0.1) } + assert_equal t2, t2.join(0.2) + assert_nil t2.value + ensure + t1&.kill + t2&.kill + end + def test_sized_queue_pop_non_block q = Thread::SizedQueue.new(1) assert_raise_with_message(ThreadError, /empty/) do diff --git a/thread.c b/thread.c index 411b6d7084..feb89d4352 100644 --- a/thread.c +++ b/thread.c @@ -132,7 +132,7 @@ rb_thread_local_storage(VALUE thread) static int sleep_hrtime(rb_thread_t *, rb_hrtime_t, unsigned int fl); static void sleep_forever(rb_thread_t *th, unsigned int fl); -static void rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker); +static void rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker, VALUE timeout, rb_hrtime_t end); static int rb_threadptr_dead(rb_thread_t *th); static void rb_check_deadlock(rb_ractor_t *r); static int rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th); @@ -1328,6 +1328,28 @@ sleep_hrtime(rb_thread_t *th, rb_hrtime_t rel, unsigned int fl) return woke; } +static int +sleep_hrtime_until(rb_thread_t *th, rb_hrtime_t end, unsigned int fl) +{ + enum rb_thread_status prev_status = th->status; + int woke; + rb_hrtime_t rel = rb_hrtime_sub(end, rb_hrtime_now()); + + th->status = THREAD_STOPPED; + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + while (th->status == THREAD_STOPPED) { + native_sleep(th, &rel); + woke = vm_check_ints_blocking(th->ec); + if (woke && !(fl & SLEEP_SPURIOUS_CHECK)) + break; + if (hrtime_update_expire(&rel, end)) + break; + woke = 1; + } + th->status = prev_status; + return woke; +} + void rb_thread_sleep_forever(void) { @@ -1355,15 +1377,20 @@ rb_thread_sleep_interruptible(void) } static void -rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker) +rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker, VALUE timeout, rb_hrtime_t end) { VALUE scheduler = rb_fiber_scheduler_current(); if (scheduler != Qnil) { - rb_fiber_scheduler_block(scheduler, blocker, Qnil); + rb_fiber_scheduler_block(scheduler, blocker, timeout); } else { RUBY_DEBUG_LOG("%s", ""); - sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE); + if (end) { + sleep_hrtime_until(GET_THREAD(), end, SLEEP_SPURIOUS_CHECK); + } + else { + sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE); + } } } diff --git a/thread_sync.c b/thread_sync.c index 5ff36dd01d..1a0f3ee855 100644 --- a/thread_sync.c +++ b/thread_sync.c @@ -1,5 +1,6 @@ /* included by thread.c */ #include "ccan/list/list.h" +#include "builtin.h" static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable; static VALUE rb_eClosedQueueError; @@ -19,6 +20,12 @@ struct sync_waiter { struct ccan_list_node node; }; +struct queue_sleep_arg { + VALUE self; + VALUE timeout; + rb_hrtime_t end; +}; + #define MUTEX_ALLOW_TRAP FL_USER1 static void @@ -514,7 +521,7 @@ rb_mutex_abandon_all(rb_mutex_t *mutexes) static VALUE rb_mutex_sleep_forever(VALUE self) { - rb_thread_sleep_deadly_allow_spurious_wakeup(self); + rb_thread_sleep_deadly_allow_spurious_wakeup(self, Qnil, 0); return Qnil; } @@ -706,6 +713,21 @@ queue_ptr(VALUE obj) #define QUEUE_CLOSED FL_USER5 +static rb_hrtime_t +queue_timeout2hrtime(VALUE timeout) { + if (NIL_P(timeout)) { + return (rb_hrtime_t)0; + } + rb_hrtime_t rel = 0; + if (FIXNUM_P(timeout)) { + rel = rb_sec2hrtime(NUM2TIMET(timeout)); + } + else { + double2hrtime(&rel, rb_num2dbl(timeout)); + } + return rb_hrtime_add(rel, rb_hrtime_now()); +} + static void szqueue_mark(void *ptr) { @@ -964,9 +986,10 @@ rb_queue_push(VALUE self, VALUE obj) } static VALUE -queue_sleep(VALUE self) +queue_sleep(VALUE _args) { - rb_thread_sleep_deadly_allow_spurious_wakeup(self); + struct queue_sleep_arg *args = (struct queue_sleep_arg *)_args; + rb_thread_sleep_deadly_allow_spurious_wakeup(args->self, args->timeout, args->end); return Qnil; } @@ -1001,9 +1024,10 @@ szqueue_sleep_done(VALUE p) } static VALUE -queue_do_pop(VALUE self, struct rb_queue *q, int should_block) +queue_do_pop(VALUE self, struct rb_queue *q, int should_block, VALUE timeout) { check_array(self, q->que); + rb_hrtime_t end = queue_timeout2hrtime(timeout); while (RARRAY_LEN(q->que) == 0) { if (!should_block) { @@ -1028,43 +1052,25 @@ queue_do_pop(VALUE self, struct rb_queue *q, int should_block) ccan_list_add_tail(waitq, &queue_waiter.w.node); queue_waiter.as.q->num_waiting++; - rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&queue_waiter); + struct queue_sleep_arg queue_sleep_arg = { + .self = self, + .timeout = timeout, + .end = end + }; + + rb_ensure(queue_sleep, (VALUE)&queue_sleep_arg, queue_sleep_done, (VALUE)&queue_waiter); + if (!NIL_P(timeout) && (rb_hrtime_now() >= end)) + break; } } return rb_ary_shift(q->que); } -static int -queue_pop_should_block(int argc, const VALUE *argv) -{ - int should_block = 1; - rb_check_arity(argc, 0, 1); - if (argc > 0) { - should_block = !RTEST(argv[0]); - } - return should_block; -} - -/* - * Document-method: Thread::Queue#pop - * call-seq: - * pop(non_block=false) - * deq(non_block=false) - * shift(non_block=false) - * - * Retrieves data from the queue. - * - * If the queue is empty, the calling thread is suspended until data is pushed - * onto the queue. If +non_block+ is true, the thread isn't suspended, and - * +ThreadError+ is raised. - */ - static VALUE -rb_queue_pop(int argc, VALUE *argv, VALUE self) +rb_queue_pop(rb_execution_context_t *ec, VALUE self, VALUE non_block, VALUE timeout) { - int should_block = queue_pop_should_block(argc, argv); - return queue_do_pop(self, queue_ptr(self), should_block); + return queue_do_pop(self, queue_ptr(self), !RTEST(non_block), timeout); } /* @@ -1283,10 +1289,10 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self) } static VALUE -szqueue_do_pop(VALUE self, int should_block) +szqueue_do_pop(VALUE self, int should_block, VALUE timeout) { struct rb_szqueue *sq = szqueue_ptr(self); - VALUE retval = queue_do_pop(self, &sq->q, should_block); + VALUE retval = queue_do_pop(self, &sq->q, should_block, timeout); if (queue_length(self, &sq->q) < sq->max) { wakeup_one(szqueue_pushq(sq)); @@ -1294,26 +1300,10 @@ szqueue_do_pop(VALUE self, int should_block) return retval; } - -/* - * Document-method: Thread::SizedQueue#pop - * call-seq: - * pop(non_block=false) - * deq(non_block=false) - * shift(non_block=false) - * - * Retrieves data from the queue. - * - * If the queue is empty, the calling thread is suspended until data is pushed - * onto the queue. If +non_block+ is true, the thread isn't suspended, and - * +ThreadError+ is raised. - */ - static VALUE -rb_szqueue_pop(int argc, VALUE *argv, VALUE self) +rb_szqueue_pop(rb_execution_context_t *ec, VALUE self, VALUE non_block, VALUE timeout) { - int should_block = queue_pop_should_block(argc, argv); - return szqueue_do_pop(self, should_block); + return szqueue_do_pop(self, !RTEST(non_block), timeout); } /* @@ -1597,7 +1587,6 @@ Init_thread_sync(void) rb_define_method(rb_cQueue, "close", rb_queue_close, 0); rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0); rb_define_method(rb_cQueue, "push", rb_queue_push, 1); - rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1); rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0); rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0); rb_define_method(rb_cQueue, "length", rb_queue_length, 0); @@ -1605,8 +1594,6 @@ Init_thread_sync(void) rb_define_alias(rb_cQueue, "enq", "push"); rb_define_alias(rb_cQueue, "<<", "push"); - rb_define_alias(rb_cQueue, "deq", "pop"); - rb_define_alias(rb_cQueue, "shift", "pop"); rb_define_alias(rb_cQueue, "size", "length"); DEFINE_CLASS(SizedQueue, Queue); @@ -1617,16 +1604,12 @@ Init_thread_sync(void) rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0); rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1); rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, -1); - rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1); rb_define_method(rb_cSizedQueue, "empty?", rb_szqueue_empty_p, 0); rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0); rb_define_method(rb_cSizedQueue, "length", rb_szqueue_length, 0); rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0); - rb_define_alias(rb_cSizedQueue, "enq", "push"); rb_define_alias(rb_cSizedQueue, "<<", "push"); - rb_define_alias(rb_cSizedQueue, "deq", "pop"); - rb_define_alias(rb_cSizedQueue, "shift", "pop"); rb_define_alias(rb_cSizedQueue, "size", "length"); /* CVar */ @@ -1644,3 +1627,5 @@ Init_thread_sync(void) rb_provide("thread.rb"); } + +#include "thread_sync.rbinc" diff --git a/thread_sync.rb b/thread_sync.rb new file mode 100644 index 0000000000..d567ca51af --- /dev/null +++ b/thread_sync.rb @@ -0,0 +1,45 @@ +class Thread + class Queue + # call-seq: + # pop(non_block=false, timeout: nil) + # + # Retrieves data from the queue. + # + # If the queue is empty, the calling thread is suspended until data is pushed + # onto the queue. If +non_block+ is true, the thread isn't suspended, and + # +ThreadError+ is raised. + # + # If +timeout+ seconds have passed and no data is available +nil+ is + # returned. + def pop(non_block = false, timeout: nil) + if non_block && timeout + raise ArgumentError, "can't set a timeout if non_block is enabled" + end + Primitive.rb_queue_pop(non_block, timeout) + end + alias_method :deq, :pop + alias_method :shift, :pop + end + + class SizedQueue + # call-seq: + # pop(non_block=false, timeout: nil) + # + # Retrieves data from the queue. + # + # If the queue is empty, the calling thread is suspended until data is + # pushed onto the queue. If +non_block+ is true, the thread isn't + # suspended, and +ThreadError+ is raised. + # + # If +timeout+ seconds have passed and no data is available +nil+ is + # returned. + def pop(non_block = false, timeout: nil) + if non_block && timeout + raise ArgumentError, "can't set a timeout if non_block is enabled" + end + Primitive.rb_szqueue_pop(non_block, timeout) + end + alias_method :deq, :pop + alias_method :shift, :pop + end +end