* include/ruby/thread.h (rb_thread_call_without_gvl2): change

meaning of function.
  This function is called with same parameters of
  `rb_thread_call_without_gvl()'.
  However, if interrupts are detected, when return immediately.
* thread.c: implement `rb_thread_call_without_gvl2()'.



git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@37938 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
ko1 2012-11-28 13:01:25 +00:00
Родитель f5dc27aa77
Коммит 9d0de48e66
3 изменённых файлов: 114 добавлений и 89 удалений

Просмотреть файл

@ -1,3 +1,13 @@
Wed Nov 28 21:58:47 2012 Koichi Sasada <ko1@atdot.net>
* include/ruby/thread.h (rb_thread_call_without_gvl2): change
meaning of function.
This function is called with same parameters of
`rb_thread_call_without_gvl()'.
However, if interrupts are detected, when return immediately.
* thread.c: implement `rb_thread_call_without_gvl2()'.
Wed Nov 28 21:31:21 2012 Masaya Tarui <tarui@ruby-lang.org>
* thread.c (thread_join_sleep): check spurious wakeup by itself for

Просмотреть файл

@ -26,12 +26,14 @@ extern "C" {
#endif
void *rb_thread_call_with_gvl(void *(*func)(void *), void *data1);
void *rb_thread_call_without_gvl(void *(*func)(void *), void *data1,
rb_unblock_function_t *ubf, void *data2);
void *rb_thread_call_without_gvl2(void *(*func)(void *, VALUE *), void *data1,
rb_unblock_function_t *ubf, void *data2);
void *rb_thread_call_without_gvl2(void *(*func)(void *), void *data1,
rb_unblock_function_t *ubf, void *data2);
#define RUBY_CALL_WO_GVL_FLAG_SKIP_CHECK_INTS 0x01
#define RUBY_CALL_WO_GVL_FLAG_SKIP_CHECK_INTS_AFTER 0x01
#define RUBY_CALL_WO_GVL_FLAG_SKIP_CHECK_INTS_
#if defined __GNUC__ && __GNUC__ >= 4
#pragma GCC visibility pop

185
thread.c
Просмотреть файл

@ -91,10 +91,12 @@ struct rb_blocking_region_buffer {
struct rb_unblock_callback oldubf;
};
static void set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg,
struct rb_unblock_callback *old);
static int set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg,
struct rb_unblock_callback *old, int fail_if_interrupted);
static void reset_unblock_function(rb_thread_t *th, const struct rb_unblock_callback *old);
static inline int blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region,
rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted);
static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region);
#define RB_GC_SAVE_MACHINE_CONTEXT(th) \
@ -113,23 +115,13 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio
rb_thread_set_current(_th_stored); \
} while(0)
#define blocking_region_begin(th, region, func, arg) \
do { \
(region)->prev_status = (th)->status; \
set_unblock_function((th), (func), (arg), &(region)->oldubf); \
(th)->blocking_region_buffer = (region); \
(th)->status = THREAD_STOPPED; \
thread_debug("enter blocking region (%p)\n", (void *)(th)); \
RB_GC_SAVE_MACHINE_CONTEXT(th); \
gvl_release((th)->vm); \
} while (0)
#define BLOCKING_REGION(exec, ubf, ubfarg) do { \
#define BLOCKING_REGION(exec, ubf, ubfarg, fail_if_interrupted) do { \
rb_thread_t *__th = GET_THREAD(); \
struct rb_blocking_region_buffer __region; \
blocking_region_begin(__th, &__region, (ubf), (ubfarg)); \
exec; \
blocking_region_end(__th, &__region); \
if (blocking_region_begin(__th, &__region, (ubf), (ubfarg), fail_if_interrupted)) { \
exec; \
blocking_region_end(__th, &__region); \
}; \
} while(0)
#if THREAD_DEBUG
@ -260,12 +252,20 @@ rb_thread_lock_destroy(rb_thread_lock_t *lock)
native_mutex_destroy(lock);
}
static void
static int
set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg,
struct rb_unblock_callback *old)
struct rb_unblock_callback *old, int fail_if_interrupted)
{
check_ints:
RUBY_VM_CHECK_INTS(th); /* check signal or so */
if (fail_if_interrupted) {
if (RUBY_VM_INTERRUPTED_ANY(th)) {
return FALSE;
}
}
else {
RUBY_VM_CHECK_INTS(th);
}
native_mutex_lock(&th->interrupt_lock);
if (RUBY_VM_INTERRUPTED_ANY(th)) {
native_mutex_unlock(&th->interrupt_lock);
@ -277,6 +277,8 @@ set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg,
th->unblock.arg = arg;
}
native_mutex_unlock(&th->interrupt_lock);
return TRUE;
}
static void
@ -1075,6 +1077,24 @@ rb_thread_schedule(void)
/* blocking region */
static inline int
blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region,
rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted)
{
region->prev_status = th->status;
if (set_unblock_function(th, ubf, arg, &region->oldubf, fail_if_interrupted)) {
th->blocking_region_buffer = region;
th->status = THREAD_STOPPED;
thread_debug("enter blocking region (%p)\n", (void *)th);
RB_GC_SAVE_MACHINE_CONTEXT(th);
gvl_release(th->vm);
return TRUE;
}
else {
return FALSE;
}
}
static inline void
blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region)
{
@ -1094,7 +1114,7 @@ rb_thread_blocking_region_begin(void)
{
rb_thread_t *th = GET_THREAD();
struct rb_blocking_region_buffer *region = ALLOC(struct rb_blocking_region_buffer);
blocking_region_begin(th, region, ubf_select, th);
blocking_region_begin(th, region, ubf_select, th, FALSE);
return region;
}
@ -1109,24 +1129,54 @@ rb_thread_blocking_region_end(struct rb_blocking_region_buffer *region)
errno = saved_errno;
}
static void *
call_without_gvl(void *(*func)(void *), void *data1,
rb_unblock_function_t *ubf, void *data2, int fail_if_interrupted)
{
void *val = 0;
rb_thread_t *th = GET_THREAD();
int saved_errno = 0;
th->waiting_fd = -1;
if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) {
ubf = ubf_select;
data2 = th;
}
BLOCKING_REGION({
val = func(data1);
saved_errno = errno;
}, ubf, data2, fail_if_interrupted);
if (!fail_if_interrupted) {
RUBY_VM_CHECK_INTS_BLOCKING(th);
}
errno = saved_errno;
return val;
}
/*
* rb_thread_call_without_gvl - permit concurrent/parallel execution.
* rb_thread_call_without_gvl2 - permit concurrent/parallel execution with
* optional interrupt checking.
* rb_thread_call_without_gvl2 - permit concurrent/parallel execution
* without interrupt proceess.
*
* rb_thread_call_without_gvl() does:
* (1) release GVL.
* (1) Check interrupts.
* (2) release GVL.
* Other Ruby threads may run in parallel.
* (2) call func with data1
* (3) acquire GVL.
* (3) call func with data1
* (4) acquire GVL.
* Other Ruby threads can not run in parallel any more.
* (4) Check interrupts.
* (5) Check interrupts.
*
* rb_thread_call_without_gvl2() does:
* (1) release GVL.
* (2) call func with data1 and a pointer to the flags.
* (3) acquire GVL.
* (4) Check interrupts if (flags & RUBY_CALL_WO_GVL_FLAG_SKIP_CHECK_INTS) is 0.
* (1) Check interrupt and return if interrupted.
* (2) release GVL.
* (3) call func with data1 and a pointer to the flags.
* (4) acquire GVL.
*
* If another thread interrupts this thread (Thread#kill, signal delivery,
* VM-shutdown request, and so on), `ubf()' is called (`ubf()' means
@ -1144,7 +1194,7 @@ rb_thread_blocking_region_end(struct rb_blocking_region_buffer *region)
* provide proper ubf(), your program will not stop for Control+C or other
* shutdown events.
*
* "Check interrupts" on above list (4) means that check asynchronous
* "Check interrupts" on above list means that check asynchronous
* interrupt events (such as Thread#kill, signal delivery, VM-shutdown
* request, and so on) and call corresponding procedures
* (such as `trap' for signals, raise an exception for Thread#raise).
@ -1161,16 +1211,16 @@ rb_thread_blocking_region_end(struct rb_blocking_region_buffer *region)
* `read_func()' and interrupts are checked. However, if an interrupt occurs
* at (c), after *read* operation is completed, check intterrupts is harmful
* because it causes irrevocable side-effect, the read data will vanish. To
* avoid such problem, the `read_func()' should be:
* avoid such problem, the `read_func()' should be used with
* `rb_thread_call_without_gvl2()'.
*
* read_func(void *data, VALUE *flags) {
* // (a) before read
* read(buffer); // (b) reading
* // (c) after read
* if (read is complete) {
* *flags |= RUBY_CALL_WO_GVL_FLAG_SKIP_CHECK_INTS;
* }
* }
* If `rb_thread_call_without_gvl2()' detects interrupt, return its execution
* immediately. This function does not show when the execution was interrupted.
* For example, there are 4 possible timing (a), (b), (c) and before calling
* read_func(). You need to record progress of a read_func() and check
* the progress after `rb_thread_call_without_gvl2()'. You may need to call
* `rb_thread_check_ints()' correctly or your program can not process proper
* process such as `trap' and so on.
*
* NOTE: You can not execute most of Ruby C API and touch Ruby
* objects in `func()' and `ubf()', including raising an
@ -1194,54 +1244,17 @@ rb_thread_blocking_region_end(struct rb_blocking_region_buffer *region)
* they will work without GVL, and may acquire GVL when GC is needed.
*/
void *
rb_thread_call_without_gvl2(void *(*func)(void *data, VALUE *flags), void *data1,
rb_thread_call_without_gvl2(void *(*func)(void *), void *data1,
rb_unblock_function_t *ubf, void *data2)
{
void *val;
rb_thread_t *th = GET_THREAD();
int saved_errno = 0;
VALUE flags = 0;
th->waiting_fd = -1;
if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) {
ubf = ubf_select;
data2 = th;
}
BLOCKING_REGION({
val = func(data1, &flags);
saved_errno = errno;
}, ubf, data2);
if ((flags & RUBY_CALL_WO_GVL_FLAG_SKIP_CHECK_INTS) == 0) {
RUBY_VM_CHECK_INTS_BLOCKING(th);
}
errno = saved_errno;
return val;
}
struct without_gvl_wrapper_arg {
void *(*func)(void *data);
void *data;
};
static void *
without_gvl_wrapper(void *data, VALUE *flags)
{
struct without_gvl_wrapper_arg *arg = (struct without_gvl_wrapper_arg*)data;
return arg->func(arg->data);
return call_without_gvl(func, data1, ubf, data2, TRUE);
}
void *
rb_thread_call_without_gvl(void *(*func)(void *data), void *data1,
rb_unblock_function_t *ubf, void *data2)
{
struct without_gvl_wrapper_arg arg;
arg.func = func;
arg.data = data1;
return rb_thread_call_without_gvl2(without_gvl_wrapper, &arg, ubf, data2);
return call_without_gvl(func, data1, ubf, data2, FALSE);
}
VALUE
@ -1259,7 +1272,7 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
BLOCKING_REGION({
val = func(data1);
saved_errno = errno;
}, ubf_select, th);
}, ubf_select, th, FALSE);
}
TH_POP_TAG();
@ -1343,7 +1356,7 @@ rb_thread_call_with_gvl(void *(*func)(void *), void *data1)
/* enter to Ruby world: You can access Ruby values, methods and so on. */
r = (*func)(data1);
/* leave from Ruby world: You can not access Ruby values, etc. */
blocking_region_begin(th, brb, prev_unblock.func, prev_unblock.arg);
blocking_region_begin(th, brb, prev_unblock.func, prev_unblock.arg, FALSE);
return r;
}
@ -3211,7 +3224,7 @@ do_select(int n, rb_fdset_t *read, rb_fdset_t *write, rb_fdset_t *except,
BLOCKING_REGION({
result = native_fd_select(n, read, write, except, timeout, th);
if (result < 0) lerrno = errno;
}, ubf_select, th);
}, ubf_select, th, FALSE);
RUBY_VM_CHECK_INTS_BLOCKING(th);
@ -3437,7 +3450,7 @@ retry:
BLOCKING_REGION({
result = ppoll(&fds, 1, timeout, NULL);
if (result < 0) lerrno = errno;
}, ubf_select, th);
}, ubf_select, th, FALSE);
RUBY_VM_CHECK_INTS_BLOCKING(th);
@ -4161,7 +4174,7 @@ rb_mutex_lock(VALUE self)
int timeout_ms = 0;
struct rb_unblock_callback oldubf;
set_unblock_function(th, lock_interrupt, mutex, &oldubf);
set_unblock_function(th, lock_interrupt, mutex, &oldubf, FALSE);
th->status = THREAD_STOPPED_FOREVER;
th->locking_mutex = self;