зеркало из https://github.com/github/ruby.git
* thread.c (rb_thread_s_control_interrupt,
rb_thread_s_check_interrupt): added for Thread.control_intgerrupt and Thread.check_interrupt. See details on rdoc. I'll make an ticket for this feature. * test/ruby/test_thread.rb: add a test for Thread.control_intgerrupt. * thread.c (rb_threadptr_raise): make a new exception object even if argc is 0. * thread.c (rb_thread_kill): kill thread immediately if target thread is current thread. * vm_core.h (RUBY_VM_CHECK_INTS_BLOCKING): added. CHECK_INTS while/after blocking operation. * vm_core.h (RUBY_VM_CHECK_INTS): require rb_thread_t ptr. * cont.c (fiber_switch): use replaced RUBY_VM_CHECK_INTS(). * eval.c (ruby_cleanup): ditto. * insns.def: ditto. * process.c (rb_waitpid): ditto. * vm_eval.c (vm_call0): ditto. * vm_insnhelper.c (vm_call_method): ditto. git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@36470 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
Родитель
422e8d5adc
Коммит
f4a8db647a
33
ChangeLog
33
ChangeLog
|
@ -1,3 +1,36 @@
|
|||
Thu Jul 19 15:08:40 2012 Koichi Sasada <ko1@atdot.net>
|
||||
|
||||
* thread.c (rb_thread_s_control_interrupt,
|
||||
rb_thread_s_check_interrupt): added for
|
||||
Thread.control_intgerrupt and Thread.check_interrupt.
|
||||
See details on rdoc.
|
||||
I'll make an ticket for this feature.
|
||||
|
||||
* test/ruby/test_thread.rb: add a test for Thread.control_intgerrupt.
|
||||
|
||||
* thread.c (rb_threadptr_raise): make a new exception object
|
||||
even if argc is 0.
|
||||
|
||||
* thread.c (rb_thread_kill): kill thread immediately if target thread
|
||||
is current thread.
|
||||
|
||||
* vm_core.h (RUBY_VM_CHECK_INTS_BLOCKING): added.
|
||||
CHECK_INTS while/after blocking operation.
|
||||
|
||||
* vm_core.h (RUBY_VM_CHECK_INTS): require rb_thread_t ptr.
|
||||
|
||||
* cont.c (fiber_switch): use replaced RUBY_VM_CHECK_INTS().
|
||||
|
||||
* eval.c (ruby_cleanup): ditto.
|
||||
|
||||
* insns.def: ditto.
|
||||
|
||||
* process.c (rb_waitpid): ditto.
|
||||
|
||||
* vm_eval.c (vm_call0): ditto.
|
||||
|
||||
* vm_insnhelper.c (vm_call_method): ditto.
|
||||
|
||||
Thu Jul 19 22:46:48 2012 Tanaka Akira <akr@fsij.org>
|
||||
|
||||
* test/ruby/test_io.rb: remove temporally files early.
|
||||
|
|
2
cont.c
2
cont.c
|
@ -1328,7 +1328,7 @@ fiber_switch(VALUE fibval, int argc, VALUE *argv, int is_resume)
|
|||
rb_bug("rb_fiber_resume: unreachable");
|
||||
}
|
||||
#endif
|
||||
RUBY_VM_CHECK_INTS();
|
||||
RUBY_VM_CHECK_INTS(th);
|
||||
|
||||
return value;
|
||||
}
|
||||
|
|
2
eval.c
2
eval.c
|
@ -160,7 +160,7 @@ ruby_cleanup(volatile int ex)
|
|||
rb_threadptr_check_signal(th);
|
||||
PUSH_TAG();
|
||||
if ((state = EXEC_TAG()) == 0) {
|
||||
SAVE_ROOT_JMPBUF(th, { RUBY_VM_CHECK_INTS(); });
|
||||
SAVE_ROOT_JMPBUF(th, { RUBY_VM_CHECK_INTS(th); });
|
||||
}
|
||||
POP_TAG();
|
||||
|
||||
|
|
12
insns.def
12
insns.def
|
@ -1086,7 +1086,7 @@ leave
|
|||
}
|
||||
}
|
||||
|
||||
RUBY_VM_CHECK_INTS();
|
||||
RUBY_VM_CHECK_INTS(th);
|
||||
|
||||
if (UNLIKELY(VM_FRAME_TYPE_FINISH_P(GET_CFP()))) {
|
||||
#if OPT_CALL_THREADED_CODE
|
||||
|
@ -1117,7 +1117,7 @@ throw
|
|||
(VALUE throwobj)
|
||||
(VALUE val)
|
||||
{
|
||||
RUBY_VM_CHECK_INTS();
|
||||
RUBY_VM_CHECK_INTS(th);
|
||||
val = vm_throw(th, GET_CFP(), throw_state, throwobj);
|
||||
THROW_EXCEPTION(val);
|
||||
/* unreachable */
|
||||
|
@ -1138,7 +1138,7 @@ jump
|
|||
()
|
||||
()
|
||||
{
|
||||
RUBY_VM_CHECK_INTS();
|
||||
RUBY_VM_CHECK_INTS(th);
|
||||
JUMP(dst);
|
||||
}
|
||||
|
||||
|
@ -1154,7 +1154,7 @@ branchif
|
|||
()
|
||||
{
|
||||
if (RTEST(val)) {
|
||||
RUBY_VM_CHECK_INTS();
|
||||
RUBY_VM_CHECK_INTS(th);
|
||||
JUMP(dst);
|
||||
}
|
||||
}
|
||||
|
@ -1171,7 +1171,7 @@ branchunless
|
|||
()
|
||||
{
|
||||
if (!RTEST(val)) {
|
||||
RUBY_VM_CHECK_INTS();
|
||||
RUBY_VM_CHECK_INTS(th);
|
||||
JUMP(dst);
|
||||
}
|
||||
}
|
||||
|
@ -1220,7 +1220,7 @@ onceinlinecache
|
|||
}
|
||||
else if (ic->ic_value.value == Qundef)
|
||||
{
|
||||
RUBY_VM_CHECK_INTS();
|
||||
RUBY_VM_CHECK_INTS(th);
|
||||
rb_thread_schedule();
|
||||
goto retry;
|
||||
}
|
||||
|
|
|
@ -663,7 +663,7 @@ rb_waitpid(rb_pid_t pid, int *st, int flags)
|
|||
RUBY_UBF_PROCESS, 0);
|
||||
if (result < 0) {
|
||||
if (errno == EINTR) {
|
||||
RUBY_VM_CHECK_INTS();
|
||||
RUBY_VM_CHECK_INTS(GET_THREAD());
|
||||
goto retry;
|
||||
}
|
||||
return (rb_pid_t)-1;
|
||||
|
|
|
@ -615,6 +615,72 @@ class TestThread < Test::Unit::TestCase
|
|||
end
|
||||
assert_equal("Can't call on top of Fiber or Thread", error.message, bug5083)
|
||||
end
|
||||
|
||||
def make_control_interrupt_test_thread1 flag
|
||||
r = []
|
||||
q = Queue.new
|
||||
th = Thread.new{
|
||||
begin
|
||||
Thread.control_interrupt(RuntimeError => flag){
|
||||
q << :go
|
||||
begin
|
||||
sleep 0.5
|
||||
rescue
|
||||
r << :c1
|
||||
end
|
||||
}
|
||||
sleep 0.5
|
||||
rescue
|
||||
r << :c2
|
||||
end
|
||||
}
|
||||
q.pop # wait
|
||||
th.raise
|
||||
begin
|
||||
th.join
|
||||
rescue
|
||||
r << :c3
|
||||
end
|
||||
r
|
||||
end
|
||||
|
||||
def test_control_interrupt
|
||||
[[:never, :c2],
|
||||
[:immediate, :c1],
|
||||
[:on_blocking, :c1]].each{|(flag, c)|
|
||||
assert_equal([flag, c], [flag] + make_control_interrupt_test_thread1(flag))
|
||||
}
|
||||
# TODO: complex cases are needed.
|
||||
end
|
||||
|
||||
def test_check_interrupt
|
||||
q = Queue.new
|
||||
Thread.control_interrupt(RuntimeError => :never){
|
||||
th = Thread.new{
|
||||
q.push :e
|
||||
begin
|
||||
begin
|
||||
sleep 0.5
|
||||
rescue => e
|
||||
q.push :ng1
|
||||
end
|
||||
begin
|
||||
Thread.check_interrupt
|
||||
rescue => e
|
||||
q.push :ok
|
||||
end
|
||||
rescue => e
|
||||
q.push :ng2
|
||||
ensure
|
||||
q.push :ng3
|
||||
end
|
||||
}
|
||||
q.pop
|
||||
th.raise
|
||||
th.join
|
||||
assert_equal(:ok, q.pop)
|
||||
}
|
||||
end
|
||||
end
|
||||
|
||||
class TestThreadGroup < Test::Unit::TestCase
|
||||
|
|
515
thread.c
515
thread.c
|
@ -265,7 +265,7 @@ set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg,
|
|||
struct rb_unblock_callback *old)
|
||||
{
|
||||
check_ints:
|
||||
RUBY_VM_CHECK_INTS(); /* check signal or so */
|
||||
RUBY_VM_CHECK_INTS(th); /* check signal or so */
|
||||
native_mutex_lock(&th->interrupt_lock);
|
||||
if (th->interrupt_flag) {
|
||||
native_mutex_unlock(&th->interrupt_lock);
|
||||
|
@ -545,7 +545,7 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_s
|
|||
static VALUE
|
||||
thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS))
|
||||
{
|
||||
rb_thread_t *th;
|
||||
rb_thread_t *th, *current_th = GET_THREAD();
|
||||
int err;
|
||||
|
||||
if (OBJ_FROZEN(GET_THREAD()->thgroup)) {
|
||||
|
@ -559,12 +559,12 @@ thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS))
|
|||
th->first_proc = fn ? Qfalse : rb_block_proc();
|
||||
th->first_args = args; /* GC: shouldn't put before above line */
|
||||
|
||||
th->priority = GET_THREAD()->priority;
|
||||
th->thgroup = GET_THREAD()->thgroup;
|
||||
th->priority = current_th->priority;
|
||||
th->thgroup = current_th->thgroup;
|
||||
|
||||
th->async_errinfo_queue = rb_ary_new();
|
||||
th->async_errinfo_queue_checked = 0;
|
||||
th->async_errinfo_mask_stack = rb_ary_new();
|
||||
th->async_errinfo_mask_stack = rb_ary_dup(current_th->async_errinfo_mask_stack);
|
||||
|
||||
native_mutex_initialize(&th->interrupt_lock);
|
||||
if (GET_VM()->event_hooks != NULL)
|
||||
|
@ -859,7 +859,7 @@ sleep_forever(rb_thread_t *th, int deadlockable)
|
|||
if (deadlockable) {
|
||||
th->vm->sleeper--;
|
||||
}
|
||||
RUBY_VM_CHECK_INTS();
|
||||
RUBY_VM_CHECK_INTS_BLOCKING(th);
|
||||
} while (th->status == status);
|
||||
th->status = prev_status;
|
||||
}
|
||||
|
@ -896,7 +896,7 @@ sleep_timeval(rb_thread_t *th, struct timeval tv)
|
|||
th->status = THREAD_STOPPED;
|
||||
do {
|
||||
native_sleep(th, &tv);
|
||||
RUBY_VM_CHECK_INTS();
|
||||
RUBY_VM_CHECK_INTS_BLOCKING(th);
|
||||
getclockofday(&tvn);
|
||||
if (to.tv_sec < tvn.tv_sec) break;
|
||||
if (to.tv_sec == tvn.tv_sec && to.tv_usec <= tvn.tv_usec) break;
|
||||
|
@ -968,9 +968,9 @@ rb_thread_wait_for(struct timeval time)
|
|||
void
|
||||
rb_thread_polling(void)
|
||||
{
|
||||
RUBY_VM_CHECK_INTS();
|
||||
if (!rb_thread_alone()) {
|
||||
rb_thread_t *th = GET_THREAD();
|
||||
RUBY_VM_CHECK_INTS_BLOCKING(th);
|
||||
sleep_for_polling(th);
|
||||
}
|
||||
}
|
||||
|
@ -985,7 +985,7 @@ rb_thread_polling(void)
|
|||
void
|
||||
rb_thread_check_ints(void)
|
||||
{
|
||||
RUBY_VM_CHECK_INTS();
|
||||
RUBY_VM_CHECK_INTS_BLOCKING(GET_THREAD());
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -1013,7 +1013,7 @@ rb_thread_sleep(int sec)
|
|||
rb_thread_wait_for(rb_time_timeval(INT2FIX(sec)));
|
||||
}
|
||||
|
||||
static void rb_threadptr_execute_interrupts_common(rb_thread_t *);
|
||||
static void rb_threadptr_execute_interrupts_common(rb_thread_t *, int blocking);
|
||||
|
||||
static void
|
||||
rb_thread_schedule_limits(unsigned long limits_us)
|
||||
|
@ -1040,7 +1040,7 @@ rb_thread_schedule(void)
|
|||
rb_thread_schedule_limits(0);
|
||||
|
||||
if (UNLIKELY(GET_THREAD()->interrupt_flag)) {
|
||||
rb_threadptr_execute_interrupts_common(GET_THREAD());
|
||||
rb_threadptr_execute_interrupts_common(GET_THREAD(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1076,7 +1076,7 @@ rb_thread_blocking_region_end(struct rb_blocking_region_buffer *region)
|
|||
rb_thread_t *th = GET_THREAD();
|
||||
blocking_region_end(th, region);
|
||||
xfree(region);
|
||||
RUBY_VM_CHECK_INTS();
|
||||
RUBY_VM_CHECK_INTS_BLOCKING(th);
|
||||
errno = saved_errno;
|
||||
}
|
||||
|
||||
|
@ -1181,7 +1181,7 @@ rb_thread_call_without_gvl2(void *(*func)(void *data, int *skip_checkints), void
|
|||
}, ubf, data2);
|
||||
|
||||
if (!skip_checkints) {
|
||||
RUBY_VM_CHECK_INTS();
|
||||
RUBY_VM_CHECK_INTS_BLOCKING(th);
|
||||
}
|
||||
|
||||
errno = saved_errno;
|
||||
|
@ -1237,7 +1237,7 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
|
|||
JUMP_TAG(state);
|
||||
}
|
||||
/* TODO: check func() */
|
||||
RUBY_VM_CHECK_INTS();
|
||||
RUBY_VM_CHECK_INTS_BLOCKING(th);
|
||||
|
||||
errno = saved_errno;
|
||||
|
||||
|
@ -1350,86 +1350,6 @@ thread_s_pass(VALUE klass)
|
|||
return Qnil;
|
||||
}
|
||||
|
||||
/*
|
||||
*
|
||||
*/
|
||||
|
||||
static void
|
||||
rb_threadptr_execute_interrupts_common(rb_thread_t *th)
|
||||
{
|
||||
rb_atomic_t interrupt;
|
||||
|
||||
if (th->raised_flag) return;
|
||||
|
||||
while ((interrupt = ATOMIC_EXCHANGE(th->interrupt_flag, 0)) != 0) {
|
||||
enum rb_thread_status status = th->status;
|
||||
int timer_interrupt = interrupt & 0x01;
|
||||
int finalizer_interrupt = interrupt & 0x04;
|
||||
int sig;
|
||||
|
||||
th->status = THREAD_RUNNABLE;
|
||||
|
||||
/* signal handling */
|
||||
if (th == th->vm->main_thread) {
|
||||
while ((sig = rb_get_next_signal()) != 0) {
|
||||
rb_signal_exec(th, sig);
|
||||
}
|
||||
}
|
||||
|
||||
/* exception from another thread */
|
||||
if (rb_threadptr_async_errinfo_active_p(th)) {
|
||||
VALUE err = rb_threadptr_async_errinfo_deque(th);
|
||||
thread_debug("rb_thread_execute_interrupts: %"PRIdVALUE"\n", err);
|
||||
|
||||
if (err == eKillSignal /* Thread#kill receieved */ ||
|
||||
err == eTerminateSignal /* Terminate thread */ ) {
|
||||
rb_threadptr_async_errinfo_clear(th);
|
||||
th->status = THREAD_TO_KILL;
|
||||
th->errinfo = INT2FIX(TAG_FATAL);
|
||||
TH_JUMP_TAG(th, TAG_FATAL);
|
||||
}
|
||||
else {
|
||||
rb_exc_raise(err);
|
||||
}
|
||||
}
|
||||
th->status = status;
|
||||
|
||||
if (finalizer_interrupt) {
|
||||
rb_gc_finalize_deferred();
|
||||
}
|
||||
|
||||
if (timer_interrupt) {
|
||||
unsigned long limits_us = TIME_QUANTUM_USEC;
|
||||
|
||||
if (th->priority > 0)
|
||||
limits_us <<= th->priority;
|
||||
else
|
||||
limits_us >>= -th->priority;
|
||||
|
||||
if (status == THREAD_RUNNABLE)
|
||||
th->running_time_us += TIME_QUANTUM_USEC;
|
||||
|
||||
EXEC_EVENT_HOOK(th, RUBY_EVENT_SWITCH, th->cfp->self, 0, 0);
|
||||
|
||||
rb_thread_schedule_limits(limits_us);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
rb_threadptr_execute_interrupts(rb_thread_t *th)
|
||||
{
|
||||
rb_threadptr_execute_interrupts_common(th);
|
||||
}
|
||||
|
||||
void
|
||||
rb_thread_execute_interrupts(VALUE thval)
|
||||
{
|
||||
rb_thread_t *th;
|
||||
GetThreadPtr(thval, th);
|
||||
rb_threadptr_execute_interrupts_common(th);
|
||||
}
|
||||
|
||||
/*****************************************************/
|
||||
|
||||
/*
|
||||
|
@ -1461,27 +1381,391 @@ rb_threadptr_async_errinfo_enque(rb_thread_t *th, VALUE v)
|
|||
th->async_errinfo_queue_checked = 0;
|
||||
}
|
||||
|
||||
VALUE
|
||||
rb_threadptr_async_errinfo_deque(rb_thread_t *th)
|
||||
enum interrupt_timing {
|
||||
INTERRUPT_NONE,
|
||||
INTERRUPT_IMMEDIATE,
|
||||
INTERRUPT_ON_BLOCKING,
|
||||
INTERRUPT_NEVER
|
||||
};
|
||||
|
||||
static enum interrupt_timing
|
||||
rb_threadptr_async_errinfo_check_mask(rb_thread_t *th, VALUE err)
|
||||
{
|
||||
VALUE mask;
|
||||
long mask_stack_len = RARRAY_LEN(th->async_errinfo_mask_stack);
|
||||
VALUE *mask_stack = RARRAY_PTR(th->async_errinfo_mask_stack);
|
||||
VALUE ancestors = rb_mod_ancestors(err); /* TODO: GC guard */
|
||||
long ancestors_len = RARRAY_LEN(ancestors);
|
||||
VALUE *ancestors_ptr = RARRAY_PTR(ancestors);
|
||||
int i, j;
|
||||
|
||||
for (i=0; i<mask_stack_len; i++) {
|
||||
mask = mask_stack[mask_stack_len-(i+1)];
|
||||
|
||||
for (j=0; j<ancestors_len; j++) {
|
||||
VALUE klass = ancestors_ptr[j];
|
||||
VALUE sym;
|
||||
|
||||
/* TODO: remove rb_intern() */
|
||||
if ((sym = rb_hash_aref(mask, klass)) != Qnil) {
|
||||
if (sym == ID2SYM(rb_intern("immediate"))) {
|
||||
return INTERRUPT_IMMEDIATE;
|
||||
}
|
||||
else if (sym == ID2SYM(rb_intern("on_blocking"))) {
|
||||
return INTERRUPT_ON_BLOCKING;
|
||||
}
|
||||
else if (sym == ID2SYM(rb_intern("never"))) {
|
||||
return INTERRUPT_NEVER;
|
||||
}
|
||||
else {
|
||||
rb_raise(rb_eThreadError, "unknown mask signature");
|
||||
}
|
||||
}
|
||||
}
|
||||
/* try to next mask */
|
||||
}
|
||||
return INTERRUPT_NONE;
|
||||
}
|
||||
|
||||
static int
|
||||
rb_threadptr_async_errinfo_empty_p(rb_thread_t *th)
|
||||
{
|
||||
return RARRAY_LEN(th->async_errinfo_queue) == 0;
|
||||
}
|
||||
|
||||
static VALUE
|
||||
rb_threadptr_async_errinfo_deque(rb_thread_t *th, enum interrupt_timing timing)
|
||||
{
|
||||
#if 1 /* 1 to enable Thread#control_interrupt, 0 to ignore it */
|
||||
int i;
|
||||
|
||||
for (i=0; i<RARRAY_LEN(th->async_errinfo_queue); i++) {
|
||||
VALUE err = RARRAY_PTR(th->async_errinfo_queue)[i];
|
||||
|
||||
enum interrupt_timing mask_timing = rb_threadptr_async_errinfo_check_mask(th, CLASS_OF(err));
|
||||
|
||||
switch (mask_timing) {
|
||||
case INTERRUPT_ON_BLOCKING:
|
||||
if (timing != INTERRUPT_ON_BLOCKING) {
|
||||
break;
|
||||
}
|
||||
/* fall through */
|
||||
case INTERRUPT_NONE: /* default: IMMEDIATE */
|
||||
case INTERRUPT_IMMEDIATE:
|
||||
rb_ary_delete_at(th->async_errinfo_queue, i);
|
||||
return err;
|
||||
case INTERRUPT_NEVER:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
th->async_errinfo_queue_checked = 1;
|
||||
return Qundef;
|
||||
#else
|
||||
VALUE err = rb_ary_shift(th->async_errinfo_queue);
|
||||
if (RARRAY_LEN(th->async_errinfo_queue) == 0) {
|
||||
if (rb_threadptr_async_errinfo_empty_p(th)) {
|
||||
th->async_errinfo_queue_checked = 1;
|
||||
}
|
||||
return err;
|
||||
#endif
|
||||
}
|
||||
|
||||
int
|
||||
rb_threadptr_async_errinfo_active_p(rb_thread_t *th)
|
||||
{
|
||||
if (th->async_errinfo_queue_checked) {
|
||||
if (th->async_errinfo_queue_checked || rb_threadptr_async_errinfo_empty_p(th)) {
|
||||
return 0;
|
||||
}
|
||||
else {
|
||||
return RARRAY_LEN(th->async_errinfo_queue) > 0;
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
static VALUE
|
||||
rb_threadptr_interrupt_mask(rb_thread_t *th, VALUE mask, VALUE (*func)(rb_thread_t *th))
|
||||
{
|
||||
VALUE r = Qnil;
|
||||
int state;
|
||||
|
||||
rb_ary_push(th->async_errinfo_mask_stack, mask);
|
||||
if (!rb_threadptr_async_errinfo_empty_p(th)) {
|
||||
th->async_errinfo_queue_checked = 0;
|
||||
RUBY_VM_SET_INTERRUPT(th);
|
||||
}
|
||||
|
||||
TH_PUSH_TAG(th);
|
||||
if ((state = EXEC_TAG()) == 0) {
|
||||
r = func(th);
|
||||
}
|
||||
TH_POP_TAG();
|
||||
|
||||
rb_ary_pop(th->async_errinfo_mask_stack);
|
||||
if (!rb_threadptr_async_errinfo_empty_p(th)) {
|
||||
th->async_errinfo_queue_checked = 0;
|
||||
RUBY_VM_SET_INTERRUPT(th);
|
||||
}
|
||||
|
||||
if (state) {
|
||||
JUMP_TAG(state);
|
||||
}
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
/*
|
||||
* call-seq:
|
||||
* Thread.control_interrupt(hash) { ... } -> result of the block
|
||||
*
|
||||
* Thread.control_interrupt controls interrupt timing.
|
||||
*
|
||||
* _interrupt_ means asynchronous event and corresponding procedure
|
||||
* by Thread#raise, Thread#kill, signal trap (not supported yet)
|
||||
* and main thread termination (if main thread terminates, then all
|
||||
* other thread will be killed).
|
||||
*
|
||||
* _hash_ has pairs of ExceptionClass and TimingSymbol. TimingSymbol
|
||||
* is one of them:
|
||||
* - :immediate Invoke interrupt immediately.
|
||||
* - :on_blocking Invoke interrupt while _BlockingOperation_.
|
||||
* - :never Never invoke interrupt.
|
||||
*
|
||||
* _BlockingOperation_ means that the operation will block the calling thread,
|
||||
* such as read and write. On CRuby implementation, _BlockingOperation_ is
|
||||
* operation executed without GVL.
|
||||
*
|
||||
* Masked interrupts are delayed until they are enabled.
|
||||
* This method is similar to sigprocmask(3).
|
||||
*
|
||||
* TODO (DOC): control_interrupt is stacked.
|
||||
* TODO (DOC): check ancestors.
|
||||
* TODO (DOC): to prevent all interrupt, {Object => :never} works.
|
||||
*
|
||||
* NOTE: Asynchronous interrupts are difficult to use.
|
||||
* If you need to communicate between threads,
|
||||
* please consider to use another way such as Queue.
|
||||
* Or use them with deep understanding about this method.
|
||||
*
|
||||
*
|
||||
* # example: Guard from Thread#raise
|
||||
* th = Thread.new do
|
||||
* Thead.control_interrupt(RuntimeError => :never) {
|
||||
* begin
|
||||
* # Thread#raise doesn't interrupt here.
|
||||
* # You can write resource allocation code safely.
|
||||
* Thread.control_interrupt(RuntimeError => :immediate) {
|
||||
* # ...
|
||||
* # It is possible to be interrupted by Thread#raise.
|
||||
* }
|
||||
* ensure
|
||||
* # Thread#raise doesn't interrupt here.
|
||||
* # You can write resource dealocation code safely.
|
||||
* end
|
||||
* }
|
||||
* end
|
||||
* Thread.pass
|
||||
* # ...
|
||||
* th.raise "stop"
|
||||
*
|
||||
* # example: Guard from TimeoutError
|
||||
* require 'timeout'
|
||||
* Thread.control_interrupt(TimeoutError => :never) {
|
||||
* timeout(10){
|
||||
* # TimeoutError doesn't occur here
|
||||
* Thread.control_interrupt(TimeoutError => :on_blocking) {
|
||||
* # possible to be killed by TimeoutError
|
||||
* # while blocking operation
|
||||
* }
|
||||
* # TimeoutError doesn't occur here
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* # example: Stack control settings
|
||||
* Thread.control_interrupt(FooError => :never) {
|
||||
* Thread.control_interrupt(BarError => :never) {
|
||||
* # FooError and BarError are prohibited.
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* # example: check ancestors
|
||||
* Thread.control_interrupt(Exception => :never) {
|
||||
* # all exceptions inherited from Exception are prohibited.
|
||||
* }
|
||||
*
|
||||
*/
|
||||
|
||||
static VALUE
|
||||
control_interrupt_func(rb_thread_t *th)
|
||||
{
|
||||
return rb_yield(Qnil);
|
||||
}
|
||||
|
||||
static VALUE
|
||||
rb_thread_s_control_interrupt(VALUE self, VALUE mask_arg)
|
||||
{
|
||||
if (!rb_block_given_p()) {
|
||||
rb_raise(rb_eArgError, "block is needed.");
|
||||
}
|
||||
|
||||
return rb_threadptr_interrupt_mask(GET_THREAD(),
|
||||
rb_convert_type(mask_arg, T_HASH, "Hash", "to_hash"),
|
||||
control_interrupt_func);
|
||||
}
|
||||
|
||||
/*
|
||||
* call-seq:
|
||||
* Thread.check_interrupt() -> nil
|
||||
*
|
||||
* Check queued interrupts.
|
||||
*
|
||||
* If there are queued interrupts, process respective procedures.
|
||||
*
|
||||
* This method can be defined as the following Ruby code:
|
||||
*
|
||||
* def Thread.check_interrupt
|
||||
* Thread.control_interrupt(Object => :immediate) {
|
||||
* Thread.pass
|
||||
* }
|
||||
* end
|
||||
*
|
||||
* Examples:
|
||||
*
|
||||
* th = Thread.new{
|
||||
* Thread.control_interrupt(RuntimeError => :on_blocking){
|
||||
* while true
|
||||
* ...
|
||||
* # reach safe point to invoke interrupt
|
||||
* Thread.check_interrupt
|
||||
* ...
|
||||
* end
|
||||
* }
|
||||
* }
|
||||
* ...
|
||||
* th.raise # stop thread
|
||||
*
|
||||
* NOTE: This example can be described by the another code.
|
||||
* You need to keep to avoid asynchronous interrupts.
|
||||
*
|
||||
* flag = true
|
||||
* th = Thread.new{
|
||||
* Thread.control_interrupt(RuntimeError => :on_blocking){
|
||||
* while true
|
||||
* ...
|
||||
* # reach safe point to invoke interrupt
|
||||
* break if flag == false
|
||||
* ...
|
||||
* end
|
||||
* }
|
||||
* }
|
||||
* ...
|
||||
* flag = false # stop thread
|
||||
*/
|
||||
|
||||
static VALUE
|
||||
check_interrupt_func(rb_thread_t *th)
|
||||
{
|
||||
RUBY_VM_CHECK_INTS(th);
|
||||
return Qnil;
|
||||
}
|
||||
|
||||
static VALUE
|
||||
rb_thread_s_check_interrupt(VALUE self)
|
||||
{
|
||||
rb_thread_t *th = GET_THREAD();
|
||||
|
||||
if (!rb_threadptr_async_errinfo_empty_p(th)) {
|
||||
VALUE mask = rb_hash_new();
|
||||
rb_hash_aset(mask, rb_cObject, ID2SYM(rb_intern("immediate")));
|
||||
rb_threadptr_interrupt_mask(GET_THREAD(), mask, check_interrupt_func);
|
||||
}
|
||||
|
||||
return Qnil;
|
||||
}
|
||||
|
||||
static void
|
||||
rb_threadptr_to_kill(rb_thread_t *th)
|
||||
{
|
||||
rb_threadptr_async_errinfo_clear(th);
|
||||
th->status = THREAD_TO_KILL;
|
||||
th->errinfo = INT2FIX(TAG_FATAL);
|
||||
TH_JUMP_TAG(th, TAG_FATAL);
|
||||
}
|
||||
|
||||
static void
|
||||
rb_threadptr_execute_interrupts_common(rb_thread_t *th, int blocking_timing)
|
||||
{
|
||||
rb_atomic_t interrupt;
|
||||
|
||||
if (th->raised_flag) return;
|
||||
|
||||
while ((interrupt = ATOMIC_EXCHANGE(th->interrupt_flag, 0)) != 0) {
|
||||
enum rb_thread_status status = th->status;
|
||||
int timer_interrupt = interrupt & 0x01;
|
||||
int finalizer_interrupt = interrupt & 0x04;
|
||||
int sig;
|
||||
|
||||
th->status = THREAD_RUNNABLE;
|
||||
|
||||
/* signal handling */
|
||||
if (th == th->vm->main_thread) {
|
||||
while ((sig = rb_get_next_signal()) != 0) {
|
||||
rb_signal_exec(th, sig);
|
||||
}
|
||||
}
|
||||
|
||||
/* exception from another thread */
|
||||
if (rb_threadptr_async_errinfo_active_p(th)) {
|
||||
VALUE err = rb_threadptr_async_errinfo_deque(th, blocking_timing ? INTERRUPT_ON_BLOCKING : INTERRUPT_NONE);
|
||||
thread_debug("rb_thread_execute_interrupts: %"PRIdVALUE"\n", err);
|
||||
|
||||
if (err == Qundef) {
|
||||
/* no error */
|
||||
}
|
||||
else if (err == eKillSignal /* Thread#kill receieved */ ||
|
||||
err == eTerminateSignal /* Terminate thread */ ) {
|
||||
rb_threadptr_to_kill(th);
|
||||
}
|
||||
else {
|
||||
rb_exc_raise(err);
|
||||
}
|
||||
}
|
||||
th->status = status;
|
||||
|
||||
if (finalizer_interrupt) {
|
||||
rb_gc_finalize_deferred();
|
||||
}
|
||||
|
||||
if (timer_interrupt) {
|
||||
unsigned long limits_us = TIME_QUANTUM_USEC;
|
||||
|
||||
if (th->priority > 0)
|
||||
limits_us <<= th->priority;
|
||||
else
|
||||
limits_us >>= -th->priority;
|
||||
|
||||
if (status == THREAD_RUNNABLE)
|
||||
th->running_time_us += TIME_QUANTUM_USEC;
|
||||
|
||||
EXEC_EVENT_HOOK(th, RUBY_EVENT_SWITCH, th->cfp->self, 0, 0);
|
||||
|
||||
rb_thread_schedule_limits(limits_us);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
|
||||
{
|
||||
rb_threadptr_execute_interrupts_common(th, blocking_timing);
|
||||
}
|
||||
|
||||
void
|
||||
rb_thread_execute_interrupts(VALUE thval)
|
||||
{
|
||||
rb_thread_t *th;
|
||||
GetThreadPtr(thval, th);
|
||||
rb_threadptr_execute_interrupts_common(th, 1);
|
||||
}
|
||||
|
||||
static void
|
||||
rb_threadptr_ready(rb_thread_t *th)
|
||||
{
|
||||
|
@ -1497,7 +1781,12 @@ rb_threadptr_raise(rb_thread_t *th, int argc, VALUE *argv)
|
|||
return Qnil;
|
||||
}
|
||||
|
||||
exc = rb_make_exception(argc, argv);
|
||||
if (argc == 0) {
|
||||
exc = rb_exc_new(rb_eRuntimeError, 0, 0);
|
||||
}
|
||||
else {
|
||||
exc = rb_make_exception(argc, argv);
|
||||
}
|
||||
rb_threadptr_async_errinfo_enque(th, exc);
|
||||
rb_threadptr_interrupt(th);
|
||||
return Qnil;
|
||||
|
@ -1642,9 +1931,15 @@ rb_thread_kill(VALUE thread)
|
|||
|
||||
thread_debug("rb_thread_kill: %p (%p)\n", (void *)th, (void *)th->thread_id);
|
||||
|
||||
rb_threadptr_async_errinfo_enque(th, eKillSignal);
|
||||
th->status = THREAD_TO_KILL;
|
||||
rb_threadptr_interrupt(th);
|
||||
if (th == GET_THREAD()) {
|
||||
/* kill myself immediately */
|
||||
rb_threadptr_to_kill(th);
|
||||
}
|
||||
else {
|
||||
rb_threadptr_async_errinfo_enque(th, eKillSignal);
|
||||
th->status = THREAD_TO_KILL;
|
||||
rb_threadptr_interrupt(th);
|
||||
}
|
||||
return thread;
|
||||
}
|
||||
|
||||
|
@ -1683,7 +1978,8 @@ rb_thread_s_kill(VALUE obj, VALUE th)
|
|||
static VALUE
|
||||
rb_thread_exit(void)
|
||||
{
|
||||
return rb_thread_kill(GET_THREAD()->self);
|
||||
rb_thread_t *th = GET_THREAD();
|
||||
return rb_thread_kill(th->self);
|
||||
}
|
||||
|
||||
|
||||
|
@ -2704,7 +3000,7 @@ do_select(int n, rb_fdset_t *read, rb_fdset_t *write, rb_fdset_t *except,
|
|||
if (result < 0) lerrno = errno;
|
||||
}, ubf_select, th);
|
||||
|
||||
RUBY_VM_CHECK_INTS();
|
||||
RUBY_VM_CHECK_INTS_BLOCKING(th);
|
||||
|
||||
errno = lerrno;
|
||||
|
||||
|
@ -2910,6 +3206,7 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
|
|||
double limit = 0;
|
||||
struct timespec ts;
|
||||
struct timespec *timeout = NULL;
|
||||
rb_thread_t *th = GET_THREAD();
|
||||
|
||||
if (tv) {
|
||||
ts.tv_sec = tv->tv_sec;
|
||||
|
@ -2927,9 +3224,9 @@ retry:
|
|||
BLOCKING_REGION({
|
||||
result = ppoll(&fds, 1, timeout, NULL);
|
||||
if (result < 0) lerrno = errno;
|
||||
}, ubf_select, GET_THREAD());
|
||||
}, ubf_select, th);
|
||||
|
||||
RUBY_VM_CHECK_INTS();
|
||||
RUBY_VM_CHECK_INTS_BLOCKING(th);
|
||||
|
||||
if (result < 0) {
|
||||
errno = lerrno;
|
||||
|
@ -3673,7 +3970,7 @@ rb_mutex_lock(VALUE self)
|
|||
if (mutex->th == th) mutex_locked(th, self);
|
||||
|
||||
if (interrupted) {
|
||||
RUBY_VM_CHECK_INTS();
|
||||
RUBY_VM_CHECK_INTS_BLOCKING(th);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -4755,6 +5052,8 @@ Init_Thread(void)
|
|||
rb_define_singleton_method(rb_cThread, "DEBUG", rb_thread_s_debug, 0);
|
||||
rb_define_singleton_method(rb_cThread, "DEBUG=", rb_thread_s_debug_set, 1);
|
||||
#endif
|
||||
rb_define_singleton_method(rb_cThread, "control_interrupt", rb_thread_s_control_interrupt, 1);
|
||||
rb_define_singleton_method(rb_cThread, "check_interrupt", rb_thread_s_check_interrupt, 1);
|
||||
|
||||
rb_define_method(rb_cThread, "initialize", thread_initialize, -2);
|
||||
rb_define_method(rb_cThread, "raise", thread_raise_m, -1);
|
||||
|
|
14
vm_core.h
14
vm_core.h
|
@ -769,25 +769,27 @@ void rb_signal_exec(rb_thread_t *th, int sig);
|
|||
void rb_threadptr_check_signal(rb_thread_t *mth);
|
||||
void rb_threadptr_signal_raise(rb_thread_t *th, int sig);
|
||||
void rb_threadptr_signal_exit(rb_thread_t *th);
|
||||
void rb_threadptr_execute_interrupts(rb_thread_t *);
|
||||
void rb_threadptr_execute_interrupts(rb_thread_t *, int);
|
||||
void rb_threadptr_interrupt(rb_thread_t *th);
|
||||
void rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th);
|
||||
void rb_threadptr_async_errinfo_clear(rb_thread_t *th);
|
||||
void rb_threadptr_async_errinfo_enque(rb_thread_t *th, VALUE v);
|
||||
VALUE rb_threadptr_async_errinfo_deque(rb_thread_t *th);
|
||||
int rb_threadptr_async_errinfo_active_p(rb_thread_t *th);
|
||||
|
||||
void rb_thread_lock_unlock(rb_thread_lock_t *);
|
||||
void rb_thread_lock_destroy(rb_thread_lock_t *);
|
||||
|
||||
#define RUBY_VM_CHECK_INTS_TH(th) do { \
|
||||
#define RUBY_VM_CHECK_INTS_BLOCKING(th) do { \
|
||||
if (UNLIKELY((th)->interrupt_flag)) { \
|
||||
rb_threadptr_execute_interrupts(th); \
|
||||
rb_threadptr_execute_interrupts(th, 1); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define RUBY_VM_CHECK_INTS() \
|
||||
RUBY_VM_CHECK_INTS_TH(GET_THREAD())
|
||||
#define RUBY_VM_CHECK_INTS(th) do { \
|
||||
if (UNLIKELY((th)->interrupt_flag)) { \
|
||||
rb_threadptr_execute_interrupts(th, 0); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
/* tracer */
|
||||
void
|
||||
|
|
|
@ -103,7 +103,7 @@ vm_call0(rb_thread_t* th, VALUE recv, VALUE id, int argc, const VALUE *argv,
|
|||
if (!klass || !(me = rb_method_entry(klass, id))) {
|
||||
return method_missing(recv, id, argc, argv, NOEX_SUPER);
|
||||
}
|
||||
RUBY_VM_CHECK_INTS();
|
||||
RUBY_VM_CHECK_INTS(th);
|
||||
if (!(def = me->def)) return Qnil;
|
||||
goto again;
|
||||
}
|
||||
|
@ -138,7 +138,7 @@ vm_call0(rb_thread_t* th, VALUE recv, VALUE id, int argc, const VALUE *argv,
|
|||
rb_bug("vm_call0: unsupported method type (%d)", def->type);
|
||||
val = Qundef;
|
||||
}
|
||||
RUBY_VM_CHECK_INTS();
|
||||
RUBY_VM_CHECK_INTS(th);
|
||||
return val;
|
||||
}
|
||||
|
||||
|
|
|
@ -706,7 +706,7 @@ vm_call_method(rb_thread_t *th, rb_control_frame_t *cfp,
|
|||
}
|
||||
}
|
||||
|
||||
RUBY_VM_CHECK_INTS();
|
||||
RUBY_VM_CHECK_INTS(th);
|
||||
return val;
|
||||
}
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче