Add Fiber#kill, similar to Thread#kill. (#7823)

This commit is contained in:
Samuel Williams 2023-05-18 23:33:42 +09:00 коммит произвёл GitHub
Родитель b695f58d52
Коммит 2df5a697e2
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
6 изменённых файлов: 179 добавлений и 27 удалений

Просмотреть файл

@ -3453,6 +3453,7 @@ cont.$(OBJEXT): $(top_srcdir)/internal/sanitizers.h
cont.$(OBJEXT): $(top_srcdir)/internal/serial.h
cont.$(OBJEXT): $(top_srcdir)/internal/static_assert.h
cont.$(OBJEXT): $(top_srcdir)/internal/string.h
cont.$(OBJEXT): $(top_srcdir)/internal/thread.h
cont.$(OBJEXT): $(top_srcdir)/internal/variable.h
cont.$(OBJEXT): $(top_srcdir)/internal/vm.h
cont.$(OBJEXT): $(top_srcdir)/internal/warnings.h

99
cont.c
Просмотреть файл

@ -28,6 +28,7 @@ extern int madvise(caddr_t, size_t, int);
#include "eval_intern.h"
#include "internal.h"
#include "internal/cont.h"
#include "internal/thread.h"
#include "internal/error.h"
#include "internal/gc.h"
#include "internal/proc.h"
@ -229,18 +230,18 @@ typedef struct rb_context_struct {
struct rb_jit_cont *jit_cont; // Continuation contexts for JITs
} rb_context_t;
/*
* Fiber status:
* [Fiber.new] ------> FIBER_CREATED
* | [Fiber#resume]
* v
* +--> FIBER_RESUMED ----+
* [Fiber#resume] | | [Fiber.yield] |
* | v |
* +-- FIBER_SUSPENDED | [Terminate]
* |
* FIBER_TERMINATED <-+
* [Fiber.new] ------> FIBER_CREATED ----> [Fiber#kill] --> |
* | [Fiber#resume] |
* v |
* +--> FIBER_RESUMED ----> [return] ------> |
* [Fiber#resume] | | [Fiber.yield/transfer] |
* [Fiber#transfer] | v |
* +--- FIBER_SUSPENDED --> [Fiber#kill] --> |
* |
* |
* FIBER_TERMINATED <-------------------+
*/
enum fiber_status {
FIBER_CREATED,
@ -266,6 +267,8 @@ struct rb_fiber_struct {
unsigned int yielding : 1;
unsigned int blocking : 1;
unsigned int killed : 1;
struct coroutine_context context;
struct fiber_pool_stack stack;
};
@ -1996,6 +1999,7 @@ fiber_t_alloc(VALUE fiber_value, unsigned int blocking)
fiber->cont.self = fiber_value;
fiber->cont.type = FIBER_CONTEXT;
fiber->blocking = blocking;
fiber->killed = 0;
cont_init(&fiber->cont, th);
fiber->cont.saved_ec.fiber_ptr = fiber;
@ -2522,13 +2526,16 @@ rb_fiber_start(rb_fiber_t *fiber)
if (state == TAG_RAISE) {
// noop...
}
else if (state == TAG_FATAL && err == RUBY_FATAL_FIBER_KILLED) {
need_interrupt = FALSE;
err = Qfalse;
}
else if (state == TAG_FATAL) {
rb_threadptr_pending_interrupt_enque(th, err);
}
else {
err = rb_vm_make_jump_tag_but_local_jump(state, err);
}
need_interrupt = TRUE;
}
rb_fiber_terminate(fiber, need_interrupt, err);
@ -2547,6 +2554,7 @@ rb_threadptr_root_fiber_setup(rb_thread_t *th)
fiber->cont.saved_ec.fiber_ptr = fiber;
fiber->cont.saved_ec.thread_ptr = th;
fiber->blocking = 1;
fiber->killed = 0;
fiber_status_set(fiber, FIBER_RESUMED); /* skip CREATED */
th->ec = &fiber->cont.saved_ec;
// When rb_threadptr_root_fiber_setup is called for the first time, rb_rjit_enabled and
@ -2649,6 +2657,19 @@ fiber_store(rb_fiber_t *next_fiber, rb_thread_t *th)
fiber_setcontext(next_fiber, fiber);
}
static void
fiber_check_killed(rb_fiber_t *fiber)
{
VM_ASSERT(fiber == fiber_current());
if (fiber->killed) {
rb_thread_t *thread = fiber->cont.saved_ec.thread_ptr;
thread->ec->errinfo = RUBY_FATAL_FIBER_KILLED;
EC_JUMP_TAG(thread->ec, RUBY_TAG_FATAL);
}
}
static inline VALUE
fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int kw_splat, rb_fiber_t *resuming_fiber, bool yielding)
{
@ -2737,7 +2758,14 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int kw_splat, rb_fi
current_fiber = th->ec->fiber_ptr;
value = current_fiber->cont.value;
if (current_fiber->cont.argc == -1) rb_exc_raise(value);
fiber_check_killed(current_fiber);
if (current_fiber->cont.argc == -1) {
// Fiber#raise will trigger this path.
rb_exc_raise(value);
}
return value;
}
@ -3175,14 +3203,9 @@ rb_fiber_s_yield(int argc, VALUE *argv, VALUE klass)
}
static VALUE
fiber_raise(rb_fiber_t *fiber, int argc, const VALUE *argv)
fiber_raise(rb_fiber_t *fiber, VALUE exception)
{
VALUE exception = rb_make_exception(argc, argv);
if (fiber->resuming_fiber) {
rb_raise(rb_eFiberError, "attempt to raise a resuming fiber");
}
else if (FIBER_SUSPENDED_P(fiber) && !fiber->yielding) {
if (FIBER_SUSPENDED_P(fiber) && !fiber->yielding) {
return fiber_transfer_kw(fiber, -1, &exception, RB_NO_KEYWORDS);
}
else {
@ -3193,7 +3216,9 @@ fiber_raise(rb_fiber_t *fiber, int argc, const VALUE *argv)
VALUE
rb_fiber_raise(VALUE fiber, int argc, const VALUE *argv)
{
return fiber_raise(fiber_ptr(fiber), argc, argv);
VALUE exception = rb_make_exception(argc, argv);
return fiber_raise(fiber_ptr(fiber), exception);
}
/*
@ -3223,6 +3248,39 @@ rb_fiber_m_raise(int argc, VALUE *argv, VALUE self)
return rb_fiber_raise(self, argc, argv);
}
/*
* call-seq:
* fiber.kill -> nil
*
* Terminates +fiber+ by raising an uncatchable exception, returning
* the terminated Fiber.
*
* If the fiber has not been started, transition directly to the terminated state.
*
* If the fiber is already terminated, does nothing.
*/
static VALUE
rb_fiber_m_kill(VALUE self)
{
rb_fiber_t *fiber = fiber_ptr(self);
if (fiber->killed) return Qfalse;
fiber->killed = 1;
if (fiber->status == FIBER_CREATED) {
fiber->status = FIBER_TERMINATED;
}
else if (fiber->status != FIBER_TERMINATED) {
if (fiber_current() == fiber) {
fiber_check_killed(fiber);
} else {
fiber_raise(fiber_ptr(self), Qnil);
}
}
return self;
}
/*
* call-seq:
* Fiber.current -> fiber
@ -3398,6 +3456,7 @@ Init_Cont(void)
rb_define_method(rb_cFiber, "storage=", rb_fiber_storage_set, 1);
rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1);
rb_define_method(rb_cFiber, "raise", rb_fiber_m_raise, -1);
rb_define_method(rb_cFiber, "kill", rb_fiber_m_kill, 0);
rb_define_method(rb_cFiber, "backtrace", rb_fiber_backtrace, -1);
rb_define_method(rb_cFiber, "backtrace_locations", rb_fiber_backtrace_locations, -1);
rb_define_method(rb_cFiber, "to_s", fiber_to_s, 0);

Просмотреть файл

@ -29,6 +29,10 @@ struct rb_thread_struct; /* in vm_core.h */
#define COVERAGE_TARGET_ONESHOT_LINES 8
#define COVERAGE_TARGET_EVAL 16
#define RUBY_FATAL_THREAD_KILLED INT2FIX(0)
#define RUBY_FATAL_THREAD_TERMINATED INT2FIX(1)
#define RUBY_FATAL_FIBER_KILLED RB_INT2FIX(2)
VALUE rb_obj_is_mutex(VALUE obj);
VALUE rb_suppress_tracing(VALUE (*func)(VALUE), VALUE arg);
void rb_thread_execute_interrupts(VALUE th);

Просмотреть файл

@ -0,0 +1,90 @@
require_relative '../../spec_helper'
require_relative 'fixtures/classes'
require_relative '../../shared/kernel/raise'
ruby_version_is "3.3" do
describe "Fiber#kill" do
it "kills a non-resumed fiber" do
fiber = Fiber.new{}
fiber.alive?.should == true
fiber.kill
fiber.alive?.should == false
end
it "kills a resumed fiber" do
fiber = Fiber.new{while true; Fiber.yield; end}
fiber.resume
fiber.alive?.should == true
fiber.kill
fiber.alive?.should == false
end
it "can kill itself" do
fiber = Fiber.new do
Fiber.current.kill
end
fiber.alive?.should == true
fiber.resume
fiber.alive?.should == false
end
it "kills a resumed fiber from a child" do
parent = Fiber.new do
child = Fiber.new do
parent.kill
parent.alive?.should == true
end
child.resume
end
parent.resume
parent.alive?.should == false
end
it "executes the ensure block" do
ensure_executed = false
fiber = Fiber.new do
while true; Fiber.yield; end
ensure
ensure_executed = true
end
fiber.resume
fiber.kill
ensure_executed.should == true
end
it "does not execute rescue block" do
rescue_executed = false
fiber = Fiber.new do
while true; Fiber.yield; end
rescue Exception
rescue_executed = true
end
fiber.resume
fiber.kill
rescue_executed.should == false
end
it "repeatedly kills a fiber" do
fiber = Fiber.new do
while true; Fiber.yield; end
ensure
while true; Fiber.yield; end
end
fiber.kill
fiber.alive?.should == false
end
end
end

Просмотреть файл

@ -149,8 +149,6 @@ NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd));
static int consume_communication_pipe(int fd);
static int check_signals_nogvl(rb_thread_t *, int sigwait_fd);
#define eKillSignal INT2FIX(0)
#define eTerminateSignal INT2FIX(1)
static volatile int system_working = 1;
struct waiting_fd {
@ -388,7 +386,7 @@ terminate_all(rb_ractor_t *r, const rb_thread_t *main_thread)
if (th != main_thread) {
RUBY_DEBUG_LOG("terminate start th:%u status:%s", rb_th_serial(th), thread_status_name(th, TRUE));
rb_threadptr_pending_interrupt_enque(th, eTerminateSignal);
rb_threadptr_pending_interrupt_enque(th, RUBY_FATAL_THREAD_TERMINATED);
rb_threadptr_interrupt(th);
RUBY_DEBUG_LOG("terminate done th:%u status:%s", rb_th_serial(th), thread_status_name(th, TRUE));
@ -2337,8 +2335,8 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
if (UNDEF_P(err)) {
/* no error */
}
else if (err == eKillSignal /* Thread#kill received */ ||
err == eTerminateSignal /* Terminate thread */ ||
else if (err == RUBY_FATAL_THREAD_KILLED /* Thread#kill received */ ||
err == RUBY_FATAL_THREAD_TERMINATED /* Terminate thread */ ||
err == INT2FIX(TAG_FATAL) /* Thread.exit etc. */ ) {
terminate_interrupt = 1;
}
@ -2569,7 +2567,7 @@ rb_thread_kill(VALUE thread)
}
else {
threadptr_check_pending_interrupt_queue(target_th);
rb_threadptr_pending_interrupt_enque(target_th, eKillSignal);
rb_threadptr_pending_interrupt_enque(target_th, RUBY_FATAL_THREAD_KILLED);
rb_threadptr_interrupt(target_th);
}

Просмотреть файл

@ -1616,7 +1616,7 @@ vm_throw_continue(const rb_execution_context_t *ec, VALUE err)
/* continue throw */
if (FIXNUM_P(err)) {
ec->tag->state = FIX2INT(err);
ec->tag->state = RUBY_TAG_FATAL;
}
else if (SYMBOL_P(err)) {
ec->tag->state = TAG_THROW;