From 9d0de48e662046d870e38a21771cbe939031051a Mon Sep 17 00:00:00 2001 From: ko1 Date: Wed, 28 Nov 2012 13:01:25 +0000 Subject: [PATCH] * include/ruby/thread.h (rb_thread_call_without_gvl2): change meaning of function. This function is called with same parameters of `rb_thread_call_without_gvl()'. However, if interrupts are detected, when return immediately. * thread.c: implement `rb_thread_call_without_gvl2()'. git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@37938 b2dd03c8-39d4-4d8f-98ff-823fe69b080e --- ChangeLog | 10 +++ include/ruby/thread.h | 8 +- thread.c | 185 ++++++++++++++++++++++-------------------- 3 files changed, 114 insertions(+), 89 deletions(-) diff --git a/ChangeLog b/ChangeLog index 63f0e28a2e..6cca0aa330 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,13 @@ +Wed Nov 28 21:58:47 2012 Koichi Sasada + + * include/ruby/thread.h (rb_thread_call_without_gvl2): change + meaning of function. + This function is called with same parameters of + `rb_thread_call_without_gvl()'. + However, if interrupts are detected, when return immediately. + + * thread.c: implement `rb_thread_call_without_gvl2()'. + Wed Nov 28 21:31:21 2012 Masaya Tarui * thread.c (thread_join_sleep): check spurious wakeup by itself for diff --git a/include/ruby/thread.h b/include/ruby/thread.h index f3b1483a9c..fb9057c70f 100644 --- a/include/ruby/thread.h +++ b/include/ruby/thread.h @@ -26,12 +26,14 @@ extern "C" { #endif void *rb_thread_call_with_gvl(void *(*func)(void *), void *data1); + void *rb_thread_call_without_gvl(void *(*func)(void *), void *data1, rb_unblock_function_t *ubf, void *data2); -void *rb_thread_call_without_gvl2(void *(*func)(void *, VALUE *), void *data1, - rb_unblock_function_t *ubf, void *data2); +void *rb_thread_call_without_gvl2(void *(*func)(void *), void *data1, + rb_unblock_function_t *ubf, void *data2); -#define RUBY_CALL_WO_GVL_FLAG_SKIP_CHECK_INTS 0x01 +#define RUBY_CALL_WO_GVL_FLAG_SKIP_CHECK_INTS_AFTER 0x01 +#define RUBY_CALL_WO_GVL_FLAG_SKIP_CHECK_INTS_ #if defined __GNUC__ && __GNUC__ >= 4 #pragma GCC visibility pop diff --git a/thread.c b/thread.c index ae203b274d..371607b1fa 100644 --- a/thread.c +++ b/thread.c @@ -91,10 +91,12 @@ struct rb_blocking_region_buffer { struct rb_unblock_callback oldubf; }; -static void set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg, - struct rb_unblock_callback *old); +static int set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg, + struct rb_unblock_callback *old, int fail_if_interrupted); static void reset_unblock_function(rb_thread_t *th, const struct rb_unblock_callback *old); +static inline int blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region, + rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted); static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region); #define RB_GC_SAVE_MACHINE_CONTEXT(th) \ @@ -113,23 +115,13 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio rb_thread_set_current(_th_stored); \ } while(0) -#define blocking_region_begin(th, region, func, arg) \ - do { \ - (region)->prev_status = (th)->status; \ - set_unblock_function((th), (func), (arg), &(region)->oldubf); \ - (th)->blocking_region_buffer = (region); \ - (th)->status = THREAD_STOPPED; \ - thread_debug("enter blocking region (%p)\n", (void *)(th)); \ - RB_GC_SAVE_MACHINE_CONTEXT(th); \ - gvl_release((th)->vm); \ - } while (0) - -#define BLOCKING_REGION(exec, ubf, ubfarg) do { \ +#define BLOCKING_REGION(exec, ubf, ubfarg, fail_if_interrupted) do { \ rb_thread_t *__th = GET_THREAD(); \ struct rb_blocking_region_buffer __region; \ - blocking_region_begin(__th, &__region, (ubf), (ubfarg)); \ - exec; \ - blocking_region_end(__th, &__region); \ + if (blocking_region_begin(__th, &__region, (ubf), (ubfarg), fail_if_interrupted)) { \ + exec; \ + blocking_region_end(__th, &__region); \ + }; \ } while(0) #if THREAD_DEBUG @@ -260,12 +252,20 @@ rb_thread_lock_destroy(rb_thread_lock_t *lock) native_mutex_destroy(lock); } -static void +static int set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg, - struct rb_unblock_callback *old) + struct rb_unblock_callback *old, int fail_if_interrupted) { check_ints: - RUBY_VM_CHECK_INTS(th); /* check signal or so */ + if (fail_if_interrupted) { + if (RUBY_VM_INTERRUPTED_ANY(th)) { + return FALSE; + } + } + else { + RUBY_VM_CHECK_INTS(th); + } + native_mutex_lock(&th->interrupt_lock); if (RUBY_VM_INTERRUPTED_ANY(th)) { native_mutex_unlock(&th->interrupt_lock); @@ -277,6 +277,8 @@ set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg, th->unblock.arg = arg; } native_mutex_unlock(&th->interrupt_lock); + + return TRUE; } static void @@ -1075,6 +1077,24 @@ rb_thread_schedule(void) /* blocking region */ +static inline int +blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region, + rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted) +{ + region->prev_status = th->status; + if (set_unblock_function(th, ubf, arg, ®ion->oldubf, fail_if_interrupted)) { + th->blocking_region_buffer = region; + th->status = THREAD_STOPPED; + thread_debug("enter blocking region (%p)\n", (void *)th); + RB_GC_SAVE_MACHINE_CONTEXT(th); + gvl_release(th->vm); + return TRUE; + } + else { + return FALSE; + } +} + static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region) { @@ -1094,7 +1114,7 @@ rb_thread_blocking_region_begin(void) { rb_thread_t *th = GET_THREAD(); struct rb_blocking_region_buffer *region = ALLOC(struct rb_blocking_region_buffer); - blocking_region_begin(th, region, ubf_select, th); + blocking_region_begin(th, region, ubf_select, th, FALSE); return region; } @@ -1109,24 +1129,54 @@ rb_thread_blocking_region_end(struct rb_blocking_region_buffer *region) errno = saved_errno; } +static void * +call_without_gvl(void *(*func)(void *), void *data1, + rb_unblock_function_t *ubf, void *data2, int fail_if_interrupted) +{ + void *val = 0; + + rb_thread_t *th = GET_THREAD(); + int saved_errno = 0; + + th->waiting_fd = -1; + if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) { + ubf = ubf_select; + data2 = th; + } + + BLOCKING_REGION({ + val = func(data1); + saved_errno = errno; + }, ubf, data2, fail_if_interrupted); + + if (!fail_if_interrupted) { + RUBY_VM_CHECK_INTS_BLOCKING(th); + } + + errno = saved_errno; + + return val; +} + /* * rb_thread_call_without_gvl - permit concurrent/parallel execution. - * rb_thread_call_without_gvl2 - permit concurrent/parallel execution with - * optional interrupt checking. + * rb_thread_call_without_gvl2 - permit concurrent/parallel execution + * without interrupt proceess. * * rb_thread_call_without_gvl() does: - * (1) release GVL. + * (1) Check interrupts. + * (2) release GVL. * Other Ruby threads may run in parallel. - * (2) call func with data1 - * (3) acquire GVL. + * (3) call func with data1 + * (4) acquire GVL. * Other Ruby threads can not run in parallel any more. - * (4) Check interrupts. + * (5) Check interrupts. * * rb_thread_call_without_gvl2() does: - * (1) release GVL. - * (2) call func with data1 and a pointer to the flags. - * (3) acquire GVL. - * (4) Check interrupts if (flags & RUBY_CALL_WO_GVL_FLAG_SKIP_CHECK_INTS) is 0. + * (1) Check interrupt and return if interrupted. + * (2) release GVL. + * (3) call func with data1 and a pointer to the flags. + * (4) acquire GVL. * * If another thread interrupts this thread (Thread#kill, signal delivery, * VM-shutdown request, and so on), `ubf()' is called (`ubf()' means @@ -1144,7 +1194,7 @@ rb_thread_blocking_region_end(struct rb_blocking_region_buffer *region) * provide proper ubf(), your program will not stop for Control+C or other * shutdown events. * - * "Check interrupts" on above list (4) means that check asynchronous + * "Check interrupts" on above list means that check asynchronous * interrupt events (such as Thread#kill, signal delivery, VM-shutdown * request, and so on) and call corresponding procedures * (such as `trap' for signals, raise an exception for Thread#raise). @@ -1161,16 +1211,16 @@ rb_thread_blocking_region_end(struct rb_blocking_region_buffer *region) * `read_func()' and interrupts are checked. However, if an interrupt occurs * at (c), after *read* operation is completed, check intterrupts is harmful * because it causes irrevocable side-effect, the read data will vanish. To - * avoid such problem, the `read_func()' should be: + * avoid such problem, the `read_func()' should be used with + * `rb_thread_call_without_gvl2()'. * - * read_func(void *data, VALUE *flags) { - * // (a) before read - * read(buffer); // (b) reading - * // (c) after read - * if (read is complete) { - * *flags |= RUBY_CALL_WO_GVL_FLAG_SKIP_CHECK_INTS; - * } - * } + * If `rb_thread_call_without_gvl2()' detects interrupt, return its execution + * immediately. This function does not show when the execution was interrupted. + * For example, there are 4 possible timing (a), (b), (c) and before calling + * read_func(). You need to record progress of a read_func() and check + * the progress after `rb_thread_call_without_gvl2()'. You may need to call + * `rb_thread_check_ints()' correctly or your program can not process proper + * process such as `trap' and so on. * * NOTE: You can not execute most of Ruby C API and touch Ruby * objects in `func()' and `ubf()', including raising an @@ -1194,54 +1244,17 @@ rb_thread_blocking_region_end(struct rb_blocking_region_buffer *region) * they will work without GVL, and may acquire GVL when GC is needed. */ void * -rb_thread_call_without_gvl2(void *(*func)(void *data, VALUE *flags), void *data1, +rb_thread_call_without_gvl2(void *(*func)(void *), void *data1, rb_unblock_function_t *ubf, void *data2) { - void *val; - rb_thread_t *th = GET_THREAD(); - int saved_errno = 0; - VALUE flags = 0; - - th->waiting_fd = -1; - if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) { - ubf = ubf_select; - data2 = th; - } - - BLOCKING_REGION({ - val = func(data1, &flags); - saved_errno = errno; - }, ubf, data2); - - if ((flags & RUBY_CALL_WO_GVL_FLAG_SKIP_CHECK_INTS) == 0) { - RUBY_VM_CHECK_INTS_BLOCKING(th); - } - - errno = saved_errno; - - return val; -} - -struct without_gvl_wrapper_arg { - void *(*func)(void *data); - void *data; -}; - -static void * -without_gvl_wrapper(void *data, VALUE *flags) -{ - struct without_gvl_wrapper_arg *arg = (struct without_gvl_wrapper_arg*)data; - return arg->func(arg->data); + return call_without_gvl(func, data1, ubf, data2, TRUE); } void * rb_thread_call_without_gvl(void *(*func)(void *data), void *data1, rb_unblock_function_t *ubf, void *data2) { - struct without_gvl_wrapper_arg arg; - arg.func = func; - arg.data = data1; - return rb_thread_call_without_gvl2(without_gvl_wrapper, &arg, ubf, data2); + return call_without_gvl(func, data1, ubf, data2, FALSE); } VALUE @@ -1259,7 +1272,7 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) BLOCKING_REGION({ val = func(data1); saved_errno = errno; - }, ubf_select, th); + }, ubf_select, th, FALSE); } TH_POP_TAG(); @@ -1343,7 +1356,7 @@ rb_thread_call_with_gvl(void *(*func)(void *), void *data1) /* enter to Ruby world: You can access Ruby values, methods and so on. */ r = (*func)(data1); /* leave from Ruby world: You can not access Ruby values, etc. */ - blocking_region_begin(th, brb, prev_unblock.func, prev_unblock.arg); + blocking_region_begin(th, brb, prev_unblock.func, prev_unblock.arg, FALSE); return r; } @@ -3211,7 +3224,7 @@ do_select(int n, rb_fdset_t *read, rb_fdset_t *write, rb_fdset_t *except, BLOCKING_REGION({ result = native_fd_select(n, read, write, except, timeout, th); if (result < 0) lerrno = errno; - }, ubf_select, th); + }, ubf_select, th, FALSE); RUBY_VM_CHECK_INTS_BLOCKING(th); @@ -3437,7 +3450,7 @@ retry: BLOCKING_REGION({ result = ppoll(&fds, 1, timeout, NULL); if (result < 0) lerrno = errno; - }, ubf_select, th); + }, ubf_select, th, FALSE); RUBY_VM_CHECK_INTS_BLOCKING(th); @@ -4161,7 +4174,7 @@ rb_mutex_lock(VALUE self) int timeout_ms = 0; struct rb_unblock_callback oldubf; - set_unblock_function(th, lock_interrupt, mutex, &oldubf); + set_unblock_function(th, lock_interrupt, mutex, &oldubf, FALSE); th->status = THREAD_STOPPED_FOREVER; th->locking_mutex = self;