From a2d63ea2fb84c962abddae877e9493fc57cfce1a Mon Sep 17 00:00:00 2001 From: normal Date: Tue, 27 Mar 2018 09:28:37 +0000 Subject: [PATCH] thread_sync.c: avoid reaching across stacks of dead threads rb_ensure is insufficient cleanup for fork and we must reinitialize all waitqueues in the child process. Unfortunately this increases the footprint of ConditionVariable, Queue and SizedQueue by 8 bytes on 32-bit (16 bytes on 64-bit). [ruby-core:86316] [Bug #14634] git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@62934 b2dd03c8-39d4-4d8f-98ff-823fe69b080e --- test/thread/test_cv.rb | 19 ++++++++++ test/thread/test_queue.rb | 48 +++++++++++++++++++++++++ thread.c | 2 ++ thread_sync.c | 75 ++++++++++++++++++++++++++++++++++++--- 4 files changed, 139 insertions(+), 5 deletions(-) diff --git a/test/thread/test_cv.rb b/test/thread/test_cv.rb index 70cf4483a3..a093f373b1 100644 --- a/test/thread/test_cv.rb +++ b/test/thread/test_cv.rb @@ -219,4 +219,23 @@ INPUT Marshal.dump(condvar) end end + + def test_condvar_fork + mutex = Mutex.new + condvar = ConditionVariable.new + thrs = (1..10).map do + Thread.new { mutex.synchronize { condvar.wait(mutex) } } + end + thrs.each { 3.times { Thread.pass } } + pid = fork do + mutex.synchronize { condvar.broadcast } + exit!(0) + end + _, s = Process.waitpid2(pid) + assert_predicate s, :success?, 'no segfault [ruby-core:86316] [Bug #14634]' + until thrs.empty? + mutex.synchronize { condvar.broadcast } + thrs.delete_if { |t| t.join(0.01) } + end + end if Process.respond_to?(:fork) end diff --git a/test/thread/test_queue.rb b/test/thread/test_queue.rb index 4e6c9fa4c9..d69ecf92b2 100644 --- a/test/thread/test_queue.rb +++ b/test/thread/test_queue.rb @@ -565,4 +565,52 @@ class TestQueue < Test::Unit::TestCase puts 'exit' INPUT end + + def test_fork_while_queue_waiting + q = Queue.new + sq = SizedQueue.new(1) + thq = Thread.new { q.pop } + thsq = Thread.new { sq.pop } + Thread.pass until thq.stop? && thsq.stop? + + pid = fork do + exit!(1) if q.num_waiting != 0 + exit!(2) if sq.num_waiting != 0 + exit!(6) unless q.empty? + exit!(7) unless sq.empty? + q.push :child_q + sq.push :child_sq + exit!(3) if q.pop != :child_q + exit!(4) if sq.pop != :child_sq + exit!(0) + end + _, s = Process.waitpid2(pid) + assert_predicate s, :success?, 'no segfault [ruby-core:86316] [Bug #14634]' + + q.push :thq + sq.push :thsq + assert_equal :thq, thq.value + assert_equal :thsq, thsq.value + + sq.push(1) + th = Thread.new { q.pop; sq.pop } + thsq = Thread.new { sq.push(2) } + Thread.pass until th.stop? && thsq.stop? + pid = fork do + exit!(1) if q.num_waiting != 0 + exit!(2) if sq.num_waiting != 0 + exit!(3) unless q.empty? + exit!(4) if sq.empty? + exit!(5) if sq.pop != 1 + exit!(0) + end + _, s = Process.waitpid2(pid) + assert_predicate s, :success?, 'no segfault [ruby-core:86316] [Bug #14634]' + + assert_predicate thsq, :stop? + assert_equal 1, sq.pop + assert_same sq, thsq.value + q.push('restart th') + assert_equal 2, th.value + end if Process.respond_to?(:fork) end diff --git a/thread.c b/thread.c index ea0f526d2e..1caea4976c 100644 --- a/thread.c +++ b/thread.c @@ -4216,6 +4216,8 @@ rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const r } rb_vm_living_threads_init(vm); rb_vm_living_threads_insert(vm, th); + rb_thread_sync_reset_all(); + vm->sleeper = 0; clear_coverage(); } diff --git a/thread_sync.c b/thread_sync.c index 8f68583a78..705fdb942d 100644 --- a/thread_sync.c +++ b/thread_sync.c @@ -4,6 +4,14 @@ static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable; static VALUE rb_eClosedQueueError; +/* + * keep these globally so we can walk and reinitialize them at fork + * in the child process + */ +static LIST_HEAD(szqueue_list); +static LIST_HEAD(queue_list); +static LIST_HEAD(condvar_list); + /* sync_waiter is always on-stack */ struct sync_waiter { rb_thread_t *th; @@ -54,6 +62,7 @@ typedef struct rb_mutex_struct { static void rb_mutex_abandon_all(rb_mutex_t *mutexes); static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th); static void rb_mutex_abandon_locking_mutex(rb_thread_t *th); +static void rb_thread_sync_reset_all(void); #endif static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th); @@ -538,7 +547,9 @@ void rb_mutex_allow_trap(VALUE self, int val) /* Queue */ #define queue_waitq(q) UNALIGNED_MEMBER_PTR(q, waitq) +#define queue_live(q) UNALIGNED_MEMBER_PTR(q, live) PACKED_STRUCT_UNALIGNED(struct rb_queue { + struct list_node live; struct list_head waitq; const VALUE que; int num_waiting; @@ -546,6 +557,7 @@ PACKED_STRUCT_UNALIGNED(struct rb_queue { #define szqueue_waitq(sq) UNALIGNED_MEMBER_PTR(sq, q.waitq) #define szqueue_pushq(sq) UNALIGNED_MEMBER_PTR(sq, pushq) +#define szqueue_live(sq) UNALIGNED_MEMBER_PTR(sq, q.live) PACKED_STRUCT_UNALIGNED(struct rb_szqueue { struct rb_queue q; int num_waiting_push; @@ -562,6 +574,14 @@ queue_mark(void *ptr) rb_gc_mark(q->que); } +static void +queue_free(void *ptr) +{ + struct rb_queue *q = ptr; + list_del(queue_live(q)); + ruby_xfree(ptr); +} + static size_t queue_memsize(const void *ptr) { @@ -570,7 +590,7 @@ queue_memsize(const void *ptr) static const rb_data_type_t queue_data_type = { "queue", - {queue_mark, RUBY_TYPED_DEFAULT_FREE, queue_memsize,}, + {queue_mark, queue_free, queue_memsize,}, 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED }; @@ -582,6 +602,7 @@ queue_alloc(VALUE klass) obj = TypedData_Make_Struct(klass, struct rb_queue, &queue_data_type, q); list_head_init(queue_waitq(q)); + list_add(&queue_list, queue_live(q)); return obj; } @@ -604,6 +625,14 @@ szqueue_mark(void *ptr) queue_mark(&sq->q); } +static void +szqueue_free(void *ptr) +{ + struct rb_szqueue *sq = ptr; + list_del(szqueue_live(sq)); + ruby_xfree(ptr); +} + static size_t szqueue_memsize(const void *ptr) { @@ -612,7 +641,7 @@ szqueue_memsize(const void *ptr) static const rb_data_type_t szqueue_data_type = { "sized_queue", - {szqueue_mark, RUBY_TYPED_DEFAULT_FREE, szqueue_memsize,}, + {szqueue_mark, szqueue_free, szqueue_memsize,}, 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED }; @@ -624,6 +653,7 @@ szqueue_alloc(VALUE klass) &szqueue_data_type, sq); list_head_init(szqueue_waitq(sq)); list_head_init(szqueue_pushq(sq)); + list_add(&szqueue_list, szqueue_live(sq)); return obj; } @@ -878,7 +908,7 @@ queue_do_pop(VALUE self, struct rb_queue *q, int should_block) list_add_tail(&qw.as.q->waitq, &qw.w.node); qw.as.q->num_waiting++; - rb_ensure(queue_sleep, Qfalse, queue_sleep_done, (VALUE)&qw); + rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&qw); } } @@ -1120,7 +1150,7 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self) list_add_tail(pushq, &qw.w.node); sq->num_waiting_push++; - rb_ensure(queue_sleep, Qfalse, szqueue_sleep_done, (VALUE)&qw); + rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&qw); } } @@ -1233,6 +1263,7 @@ rb_szqueue_empty_p(VALUE self) /* TODO: maybe this can be IMEMO */ struct rb_condvar { struct list_head waitq; + struct list_node live; }; /* @@ -1263,6 +1294,14 @@ struct rb_condvar { * } */ +static void +condvar_free(void *ptr) +{ + struct rb_condvar *cv = ptr; + list_del(&cv->live); + ruby_xfree(ptr); +} + static size_t condvar_memsize(const void *ptr) { @@ -1271,7 +1310,7 @@ condvar_memsize(const void *ptr) static const rb_data_type_t cv_data_type = { "condvar", - {0, RUBY_TYPED_DEFAULT_FREE, condvar_memsize,}, + {0, condvar_free, condvar_memsize,}, 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED }; @@ -1293,6 +1332,7 @@ condvar_alloc(VALUE klass) obj = TypedData_Make_Struct(klass, struct rb_condvar, &cv_data_type, cv); list_head_init(&cv->waitq); + list_add(&condvar_list, &cv->live); return obj; } @@ -1406,6 +1446,31 @@ define_thread_class(VALUE outer, const char *name, VALUE super) return klass; } +#if defined(HAVE_WORKING_FORK) +/* we must not reference stacks of dead threads in a forked child */ +static void +rb_thread_sync_reset_all(void) +{ + struct rb_queue *q = 0; + struct rb_szqueue *sq = 0; + struct rb_condvar *cv = 0; + + list_for_each(&queue_list, q, live) { + list_head_init(queue_waitq(q)); + q->num_waiting = 0; + } + list_for_each(&szqueue_list, sq, q.live) { + list_head_init(szqueue_waitq(sq)); + list_head_init(szqueue_pushq(sq)); + sq->num_waiting_push = 0; + sq->q.num_waiting = 0; + } + list_for_each(&condvar_list, cv, live) { + list_head_init(&cv->waitq); + } +} +#endif + static void Init_thread_sync(void) {