Introduce `Fiber#storage` for inheritable fiber-scoped variables. (#6612)

This commit is contained in:
Samuel Williams 2022-12-01 23:00:33 +13:00 коммит произвёл GitHub
Родитель 9869bd1d61
Коммит 0436f1e15a
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
10 изменённых файлов: 442 добавлений и 56 удалений

40
NEWS.md
Просмотреть файл

@ -102,6 +102,45 @@ Note that each entry is kept to a minimum, see links for details.
Note: We're only listing outstanding class updates.
* Fiber
* Introduce `Fiber.[]` and `Fiber.[]=` for inheritable fiber storage.
Introduce `Fiber#storage` and `Fiber#storage=` (experimental) for getting
and resetting the current storage. Introduce `Fiber.new(storage:)` for
setting the storage when creating a fiber. [[Feature #19078]]
Existing Thread and Fiber local variables can be tricky to use. Thread
local variables are shared between all fibers, making it hard to isolate,
while Fiber local variables can be hard to share. It is often desirable
to define unit of execution ("execution context") such that some state
is shared between all fibers and threads created in that context. This is
what Fiber storage provides.
```ruby
def log(message)
puts "#{Fiber[:request_id]}: #{message}"
end
def handle_requests
while request = read_request
Fiber.schedule do
Fiber[:request_id] = SecureRandom.uuid
request.messages.each do |message|
Fiber.schedule do
log("Handling #{message}") # Log includes inherited request_id.
end
end
end
end
end
```
You should generally consider Fiber storage for any state which you want
to be shared implicitly between all fibers and threads created in a given
context, e.g. a connection pool, a request id, a logger level,
environment variables, configuration, etc.
* Fiber::Scheduler
* Introduce `Fiber::Scheduler#io_select` for non-blocking IO.select.
@ -555,3 +594,4 @@ The following deprecated APIs are removed.
[Bug #19100]: https://bugs.ruby-lang.org/issues/19100
[Feature #19135]: https://bugs.ruby-lang.org/issues/19135
[Feature #19138]: https://bugs.ruby-lang.org/issues/19138
[Feature #19078]: https://bugs.ruby-lang.org/issues/19078

314
cont.c
Просмотреть файл

@ -271,7 +271,7 @@ struct rb_fiber_struct {
static struct fiber_pool shared_fiber_pool = {NULL, NULL, 0, 0, 0, 0};
static ID fiber_initialize_keywords[2] = {0};
static ID fiber_initialize_keywords[3] = {0};
/*
* FreeBSD require a first (i.e. addr) argument of mmap(2) is not NULL
@ -1156,7 +1156,9 @@ fiber_memsize(const void *ptr)
*/
if (saved_ec->local_storage && fiber != th->root_fiber) {
size += rb_id_table_memsize(saved_ec->local_storage);
size += rb_obj_memsize_of(saved_ec->storage);
}
size += cont_memsize(&fiber->cont);
return size;
}
@ -2007,11 +2009,186 @@ fiber_t_alloc(VALUE fiber_value, unsigned int blocking)
return fiber;
}
static VALUE
fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool, unsigned int blocking)
static rb_fiber_t *
root_fiber_alloc(rb_thread_t *th)
{
VALUE fiber_value = fiber_alloc(rb_cFiber);
rb_fiber_t *fiber = th->ec->fiber_ptr;
VM_ASSERT(DATA_PTR(fiber_value) == NULL);
VM_ASSERT(fiber->cont.type == FIBER_CONTEXT);
VM_ASSERT(FIBER_RESUMED_P(fiber));
th->root_fiber = fiber;
DATA_PTR(fiber_value) = fiber;
fiber->cont.self = fiber_value;
coroutine_initialize_main(&fiber->context);
return fiber;
}
static inline rb_fiber_t*
fiber_current(void)
{
rb_execution_context_t *ec = GET_EC();
if (ec->fiber_ptr->cont.self == 0) {
root_fiber_alloc(rb_ec_thread_ptr(ec));
}
return ec->fiber_ptr;
}
static inline VALUE
current_fiber_storage(void)
{
rb_execution_context_t *ec = GET_EC();
return ec->storage;
}
static inline VALUE
inherit_fiber_storage(void)
{
return rb_obj_dup(current_fiber_storage());
}
static inline void
fiber_storage_set(struct rb_fiber_struct *fiber, VALUE storage)
{
fiber->cont.saved_ec.storage = storage;
}
static inline VALUE
fiber_storage_get(rb_fiber_t *fiber)
{
VALUE storage = fiber->cont.saved_ec.storage;
if (storage == Qnil) {
storage = rb_hash_new();
fiber_storage_set(fiber, storage);
}
return storage;
}
/**
* call-seq: Fiber.current.storage -> hash (dup)
*
* Returns a copy of the storage hash for the current fiber.
*/
static VALUE
rb_fiber_storage_get(VALUE self)
{
return rb_obj_dup(fiber_storage_get(fiber_ptr(self)));
}
static int
fiber_storage_validate_each(VALUE key, VALUE value, VALUE _argument)
{
rb_check_id(&key);
return ST_CONTINUE;
}
static void
fiber_storage_validate(VALUE value)
{
if (!RB_TYPE_P(value, T_HASH)) {
rb_raise(rb_eTypeError, "storage must be a hash");
}
rb_hash_foreach(value, fiber_storage_validate_each, Qundef);
}
/**
* call-seq: Fiber.current.storage = hash
*
* Sets the storage hash for the current fiber. This feature is experimental
* and may change in the future.
*
* You should be careful about using this method as you may inadvertently clear
* important fiber-storage state. You should mostly prefer to assign specific
* keys in the storage using Fiber#[]=.
*
* You can also use Fiber.new(storage: nil) to create a fiber with an empty
* storage.
*
* Example:
*
* while request = request_queue.pop
* # Reset the per-request state:
* Fiber.current.storage = nil
* handle_request(request)
* end
*/
static VALUE
rb_fiber_storage_set(VALUE self, VALUE value)
{
fiber_storage_validate(value);
fiber_ptr(self)->cont.saved_ec.storage = rb_obj_dup(value);
return value;
}
/**
* call-seq: Fiber[key] -> value
*
* Returns the value of the fiber-local variable identified by +key+.
*
* The +key+ must be a symbol, and the value is set by Fiber#[]= or
* Fiber#store.
*
* See also Fiber[]=.
*/
static VALUE
rb_fiber_storage_aref(VALUE class, VALUE key)
{
ID id = rb_check_id(&key);
if (!id) return Qnil;
VALUE storage = fiber_storage_get(fiber_current());
if (storage == Qnil) return Qnil;
return rb_hash_aref(storage, key);
}
/**
* call-seq: Fiber[key] = value
*
* Assign +value+ to the fiber-local variable identified by +key+.
* The variable is created if it doesn't exist.
*
* +key+ must be a Symbol, otherwise a TypeError is raised.
*
* See also Fiber[].
*/
static VALUE
rb_fiber_storage_aset(VALUE class, VALUE key, VALUE value)
{
ID id = rb_check_id(&key);
if (!id) return Qnil;
VALUE storage = fiber_storage_get(fiber_current());
return rb_hash_aset(storage, key, value);
}
static VALUE
fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool, unsigned int blocking, VALUE storage)
{
if (storage == Qundef || storage == Qtrue) {
// The default, inherit storage (dup) from the current fiber:
storage = inherit_fiber_storage();
}
else if (storage == Qfalse) {
storage = current_fiber_storage();
}
else /* nil, hash, etc. */ {
fiber_storage_validate(storage);
storage = rb_obj_dup(storage);
}
rb_fiber_t *fiber = fiber_t_alloc(self, blocking);
fiber->cont.saved_ec.storage = storage;
fiber->first_proc = proc;
fiber->stack.base = NULL;
fiber->stack.pool = fiber_pool;
@ -2044,19 +2221,27 @@ rb_fiber_pool_default(VALUE pool)
return &shared_fiber_pool;
}
VALUE rb_fiber_inherit_storage(struct rb_execution_context_struct *ec, struct rb_fiber_struct *fiber)
{
VALUE storage = rb_obj_dup(ec->storage);
fiber->cont.saved_ec.storage = storage;
return storage;
}
/* :nodoc: */
static VALUE
rb_fiber_initialize_kw(int argc, VALUE* argv, VALUE self, int kw_splat)
{
VALUE pool = Qnil;
VALUE blocking = Qfalse;
VALUE storage = Qundef;
if (kw_splat != RB_NO_KEYWORDS) {
VALUE options = Qnil;
VALUE arguments[2] = {Qundef};
VALUE arguments[3] = {Qundef};
argc = rb_scan_args_kw(kw_splat, argc, argv, ":", &options);
rb_get_kwargs(options, fiber_initialize_keywords, 0, 2, arguments);
rb_get_kwargs(options, fiber_initialize_keywords, 0, 3, arguments);
if (!UNDEF_P(arguments[0])) {
blocking = arguments[0];
@ -2065,33 +2250,73 @@ rb_fiber_initialize_kw(int argc, VALUE* argv, VALUE self, int kw_splat)
if (!UNDEF_P(arguments[1])) {
pool = arguments[1];
}
storage = arguments[2];
}
return fiber_initialize(self, rb_block_proc(), rb_fiber_pool_default(pool), RTEST(blocking));
return fiber_initialize(self, rb_block_proc(), rb_fiber_pool_default(pool), RTEST(blocking), storage);
}
/*
* call-seq:
* Fiber.new(blocking: false) { |*args| ... } -> fiber
* Fiber.new(blocking: false, storage: true) { |*args| ... } -> fiber
*
* Creates new Fiber. Initially, the fiber is not running and can be resumed with
* #resume. Arguments to the first #resume call will be passed to the block:
* Creates new Fiber. Initially, the fiber is not running and can be resumed
* with #resume. Arguments to the first #resume call will be passed to the
* block:
*
* f = Fiber.new do |initial|
* current = initial
* loop do
* puts "current: #{current.inspect}"
* current = Fiber.yield
* end
* end
* f.resume(100) # prints: current: 100
* f.resume(1, 2, 3) # prints: current: [1, 2, 3]
* f.resume # prints: current: nil
* # ... and so on ...
* f = Fiber.new do |initial|
* current = initial
* loop do
* puts "current: #{current.inspect}"
* current = Fiber.yield
* end
* end
* f.resume(100) # prints: current: 100
* f.resume(1, 2, 3) # prints: current: [1, 2, 3]
* f.resume # prints: current: nil
* # ... and so on ...
*
* If <tt>blocking: false</tt> is passed to <tt>Fiber.new</tt>, _and_ current thread
* has a Fiber.scheduler defined, the Fiber becomes non-blocking (see "Non-blocking
* Fibers" section in class docs).
* If <tt>blocking: false</tt> is passed to <tt>Fiber.new</tt>, _and_ current
* thread has a Fiber.scheduler defined, the Fiber becomes non-blocking (see
* "Non-blocking Fibers" section in class docs).
*
* If the <tt>storage</tt> is unspecified, the default is to inherit a copy of
* the storage from the current fiber. This is the same as specifying
* <tt>storage: true</tt>.
*
* Fiber[:x] = 1
* Fiber.new do
* Fiber[:x] # => 1
* Fiber[:x] = 2
* end.resume
* Fiber[:x] # => 1
*
* If the <tt>storage</tt> is <tt>false</tt>, this function uses the current
* fiber's storage by reference. This is used for Enumerator to create
* hidden fiber.
*
* Fiber[:count] = 0
* enumerator = Enumerator.new do |y|
* loop{y << (Fiber[:count] += 1)}
* end
* Fiber[:count] # => 0
* enumerator.next # => 1
* Fiber[:count] # => 1
*
* If the given <tt>storage</tt> is <tt>nil</tt>, this function will lazy
* initialize the internal storage, which starts as an empty hash.
*
* Fiber[:x] = "Hello World"
* Fiber.new(storage: nil) do
* Fiber[:x] # nil
* end
*
* Otherwise, the given <tt>storage</tt> is used as the new fiber's storage,
* and it must be an instance of Hash.
*
* Explicitly using `storage: true/false` is currently experimental and may
* change in the future.
*/
static VALUE
rb_fiber_initialize(int argc, VALUE* argv, VALUE self)
@ -2099,10 +2324,16 @@ rb_fiber_initialize(int argc, VALUE* argv, VALUE self)
return rb_fiber_initialize_kw(argc, argv, self, rb_keyword_given_p());
}
VALUE
rb_fiber_new_storage(rb_block_call_func_t func, VALUE obj, VALUE storage)
{
return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), rb_fiber_pool_default(Qnil), 1, storage);
}
VALUE
rb_fiber_new(rb_block_call_func_t func, VALUE obj)
{
return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), rb_fiber_pool_default(Qnil), 1);
return rb_fiber_new_storage(func, obj, Qtrue);
}
static VALUE
@ -2276,25 +2507,6 @@ rb_fiber_start(rb_fiber_t *fiber)
rb_fiber_terminate(fiber, need_interrupt, err);
}
static rb_fiber_t *
root_fiber_alloc(rb_thread_t *th)
{
VALUE fiber_value = fiber_alloc(rb_cFiber);
rb_fiber_t *fiber = th->ec->fiber_ptr;
VM_ASSERT(DATA_PTR(fiber_value) == NULL);
VM_ASSERT(fiber->cont.type == FIBER_CONTEXT);
VM_ASSERT(FIBER_RESUMED_P(fiber));
th->root_fiber = fiber;
DATA_PTR(fiber_value) = fiber;
fiber->cont.self = fiber_value;
coroutine_initialize_main(&fiber->context);
return fiber;
}
// Set up a "root fiber", which is the fiber that every Ractor has.
void
rb_threadptr_root_fiber_setup(rb_thread_t *th)
@ -2348,16 +2560,6 @@ rb_threadptr_root_fiber_terminate(rb_thread_t *th)
rb_ec_clear_vm_stack(th->ec);
}
static inline rb_fiber_t*
fiber_current(void)
{
rb_execution_context_t *ec = GET_EC();
if (ec->fiber_ptr->cont.self == 0) {
root_fiber_alloc(rb_ec_thread_ptr(ec));
}
return ec->fiber_ptr;
}
static inline rb_fiber_t*
return_fiber(bool terminate)
{
@ -3146,6 +3348,7 @@ Init_Cont(void)
fiber_initialize_keywords[0] = rb_intern_const("blocking");
fiber_initialize_keywords[1] = rb_intern_const("pool");
fiber_initialize_keywords[2] = rb_intern_const("storage");
const char *fiber_shared_fiber_pool_free_stacks = getenv("RUBY_SHARED_FIBER_POOL_FREE_STACKS");
if (fiber_shared_fiber_pool_free_stacks) {
@ -3158,8 +3361,13 @@ Init_Cont(void)
rb_define_singleton_method(rb_cFiber, "yield", rb_fiber_s_yield, -1);
rb_define_singleton_method(rb_cFiber, "current", rb_fiber_s_current, 0);
rb_define_singleton_method(rb_cFiber, "blocking", rb_fiber_blocking, 0);
rb_define_singleton_method(rb_cFiber, "[]", rb_fiber_storage_aref, 1);
rb_define_singleton_method(rb_cFiber, "[]=", rb_fiber_storage_aset, 2);
rb_define_method(rb_cFiber, "initialize", rb_fiber_initialize, -1);
rb_define_method(rb_cFiber, "blocking?", rb_fiber_blocking_p, 0);
rb_define_method(rb_cFiber, "storage", rb_fiber_storage_get, 0);
rb_define_method(rb_cFiber, "storage=", rb_fiber_storage_set, 1);
rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1);
rb_define_method(rb_cFiber, "raise", rb_fiber_m_raise, -1);
rb_define_method(rb_cFiber, "backtrace", rb_fiber_backtrace, -1);

Просмотреть файл

@ -766,7 +766,8 @@ next_init(VALUE obj, struct enumerator *e)
{
VALUE curr = rb_fiber_current();
e->dst = curr;
e->fib = rb_fiber_new(next_i, obj);
// We inherit the fiber storage by reference, not by copy, by specifying Qfalse here.
e->fib = rb_fiber_new_storage(next_i, obj, Qfalse);
e->lookahead = Qundef;
}

Просмотреть файл

@ -38,6 +38,27 @@ RBIMPL_SYMBOL_EXPORT_BEGIN()
*/
VALUE rb_fiber_new(rb_block_call_func_t func, VALUE callback_obj);
/**
* Creates a Fiber instance from a C-backended block with the specified storage.
*
* If the given storage is Qundef, this function is equivalent to
* rb_fiber_new() which inherits storage from the current fiber.
*
* If the given storage is Qfalse, this function uses the current fiber's
* storage by reference.
*
* If the given storage is Qnil, this function will lazy initialize the
* internal storage which starts of empty (without any inheritance).
*
* Otherwise, the given storage is used as the internal storage.
*
* @param[in] func A function, to become the fiber's body.
* @param[in] callback_obj Passed as-is to `func`.
* @return An allocated new instance of rb_cFiber, which is ready to be
* "resume"d.
*/
VALUE rb_fiber_new_storage(rb_block_call_func_t func, VALUE callback_obj, VALUE storage);
/**
* Queries the fiber which is calling this function. Any ruby execution
* context has its fiber, either explicitly or implicitly.

Просмотреть файл

@ -22,6 +22,9 @@ void rb_jit_cont_init(void);
void rb_jit_cont_each_iseq(rb_iseq_callback callback, void *data);
void rb_jit_cont_finish(void);
// Copy locals from the current execution to the specified fiber.
VALUE rb_fiber_inherit_storage(struct rb_execution_context_struct *ec, struct rb_fiber_struct *fiber);
VALUE rb_fiberptr_self(struct rb_fiber_struct *fiber);
unsigned int rb_fiberptr_blocking(struct rb_fiber_struct *fiber);
struct rb_execution_context_struct * rb_fiberptr_get_ec(struct rb_fiber_struct *fiber);

Просмотреть файл

@ -442,6 +442,7 @@ module RubyVM::MJIT
local_storage: [CType::Pointer.new { self.rb_id_table }, Primitive.cexpr!("OFFSETOF((*((struct rb_execution_context_struct *)NULL)), local_storage)")],
local_storage_recursive_hash: [self.VALUE, Primitive.cexpr!("OFFSETOF((*((struct rb_execution_context_struct *)NULL)), local_storage_recursive_hash)")],
local_storage_recursive_hash_for_trace: [self.VALUE, Primitive.cexpr!("OFFSETOF((*((struct rb_execution_context_struct *)NULL)), local_storage_recursive_hash_for_trace)")],
storage: [self.VALUE, Primitive.cexpr!("OFFSETOF((*((struct rb_execution_context_struct *)NULL)), storage)")],
root_lep: [CType::Pointer.new { self.VALUE }, Primitive.cexpr!("OFFSETOF((*((struct rb_execution_context_struct *)NULL)), root_lep)")],
root_svar: [self.VALUE, Primitive.cexpr!("OFFSETOF((*((struct rb_execution_context_struct *)NULL)), root_svar)")],
ensure_list: [CType::Pointer.new { self.rb_ensure_list_t }, Primitive.cexpr!("OFFSETOF((*((struct rb_execution_context_struct *)NULL)), ensure_list)")],

101
test/fiber/test_storage.rb Normal file
Просмотреть файл

@ -0,0 +1,101 @@
# frozen_string_literal: true
require 'test/unit'
class TestFiberStorage < Test::Unit::TestCase
def test_storage
Fiber.new do
Fiber[:x] = 10
assert_kind_of Hash, Fiber.current.storage
assert_predicate Fiber.current.storage, :any?
end.resume
end
def test_storage_inherited
Fiber.new do
Fiber[:foo] = :bar
Fiber.new do
assert_equal :bar, Fiber[:foo]
Fiber[:bar] = :baz
end.resume
assert_nil Fiber[:bar]
end.resume
end
def test_variable_assignment
Fiber.new do
Fiber[:foo] = :bar
assert_equal :bar, Fiber[:foo]
end.resume
end
def test_storage_assignment
Fiber.new do
Fiber.current.storage = {foo: :bar}
assert_equal :bar, Fiber[:foo]
end.resume
end
def test_inherited_storage
Fiber.new do
Fiber.current.storage = {foo: :bar}
f = Fiber.new do
assert_equal :bar, Fiber[:foo]
end
f.resume
end.resume
end
def test_enumerator_inherited_storage
Fiber.new do
Fiber[:item] = "Hello World"
enumerator = Enumerator.new do |out|
out << Fiber.current
out << Fiber[:item]
end
# The fiber within the enumerator is not equal to the current...
assert_not_equal Fiber.current, enumerator.next
# But it inherited the storage from the current fiber:
assert_equal "Hello World", enumerator.next
end.resume
end
def test_thread_inherited_storage
Fiber.new do
Fiber[:x] = 10
x = Thread.new do
Fiber[:y] = 20
Fiber[:x]
end.value
assert_equal 10, x
assert_equal nil, Fiber[:y]
end.resume
end
def test_enumerator_count
Fiber.new do
Fiber[:count] = 0
enumerator = Enumerator.new do |y|
# Since the fiber is implementation detail, the storage are shared with the parent:
Fiber[:count] += 1
y << Fiber[:count]
end
assert_equal 1, enumerator.next
assert_equal 1, Fiber[:count]
end.resume
end
def test_storage_assignment_type_error
assert_raise(TypeError) do
Fiber.new(storage: {Object.new => "bar"}) {}
end
end
end

Просмотреть файл

@ -813,6 +813,8 @@ thread_create_core(VALUE thval, struct thread_create_params *params)
"can't start a new thread (frozen ThreadGroup)");
}
rb_fiber_inherit_storage(ec, th->ec->fiber_ptr);
switch (params->type) {
case thread_invoke_type_proc:
th->invoke_type = thread_invoke_type_proc;

8
vm.c
Просмотреть файл

@ -3047,7 +3047,7 @@ vm_init2(rb_vm_t *vm)
}
void
rb_execution_context_update(const rb_execution_context_t *ec)
rb_execution_context_update(rb_execution_context_t *ec)
{
/* update VM stack */
if (ec->vm_stack) {
@ -3087,6 +3087,8 @@ rb_execution_context_update(const rb_execution_context_t *ec)
cfp = RUBY_VM_PREVIOUS_CONTROL_FRAME(cfp);
}
}
ec->storage = rb_gc_location(ec->storage);
}
static enum rb_id_table_iterator_result
@ -3154,6 +3156,8 @@ rb_execution_context_mark(const rb_execution_context_t *ec)
RUBY_MARK_UNLESS_NULL(ec->local_storage_recursive_hash);
RUBY_MARK_UNLESS_NULL(ec->local_storage_recursive_hash_for_trace);
RUBY_MARK_UNLESS_NULL(ec->private_const_reference);
RUBY_MARK_MOVABLE_UNLESS_NULL(ec->storage);
}
void rb_fiber_mark_self(rb_fiber_t *fib);
@ -3344,6 +3348,8 @@ th_init(rb_thread_t *th, VALUE self, rb_vm_t *vm)
th->ec->local_storage_recursive_hash = Qnil;
th->ec->local_storage_recursive_hash_for_trace = Qnil;
th->ec->storage = Qnil;
#if OPT_CALL_THREADED_CODE
th->retval = Qundef;
#endif

Просмотреть файл

@ -968,6 +968,9 @@ struct rb_execution_context_struct {
VALUE local_storage_recursive_hash;
VALUE local_storage_recursive_hash_for_trace;
/* Inheritable fiber storage. */
VALUE storage;
/* eval env */
const VALUE *root_lep;
VALUE root_svar;
@ -2010,7 +2013,7 @@ void rb_threadptr_pending_interrupt_clear(rb_thread_t *th);
void rb_threadptr_pending_interrupt_enque(rb_thread_t *th, VALUE v);
VALUE rb_ec_get_errinfo(const rb_execution_context_t *ec);
void rb_ec_error_print(rb_execution_context_t * volatile ec, volatile VALUE errinfo);
void rb_execution_context_update(const rb_execution_context_t *ec);
void rb_execution_context_update(rb_execution_context_t *ec);
void rb_execution_context_mark(const rb_execution_context_t *ec);
void rb_fiber_close(rb_fiber_t *fib);
void Init_native_thread(rb_thread_t *th);