зеркало из https://github.com/github/ruby.git
Introduce `Fiber::Scheduler#io_select` hook for non-blocking `IO.select`. (#6559)
This commit is contained in:
Родитель
1acc1a5c6d
Коммит
8a420670a2
4
NEWS.md
4
NEWS.md
|
@ -102,6 +102,9 @@ Note that each entry is kept to a minimum, see links for details.
|
||||||
|
|
||||||
Note: We're only listing outstanding class updates.
|
Note: We're only listing outstanding class updates.
|
||||||
|
|
||||||
|
* Fiber::Scheduler
|
||||||
|
* Introduce `Fiber::Scheduler#io_select` for non-blocking `IO.select`. [[Feature #19060]]
|
||||||
|
|
||||||
* IO
|
* IO
|
||||||
* Introduce `IO#timeout=` and `IO#timeout` which can cause
|
* Introduce `IO#timeout=` and `IO#timeout` which can cause
|
||||||
`IO::TimeoutError` to be raised if a blocking operation exceeds the
|
`IO::TimeoutError` to be raised if a blocking operation exceeds the
|
||||||
|
@ -354,3 +357,4 @@ The following deprecated APIs are removed.
|
||||||
[Feature #16122]: https://bugs.ruby-lang.org/issues/16122
|
[Feature #16122]: https://bugs.ruby-lang.org/issues/16122
|
||||||
[Feature #18630]: https://bugs.ruby-lang.org/issues/18630
|
[Feature #18630]: https://bugs.ruby-lang.org/issues/18630
|
||||||
[Feature #18589]: https://bugs.ruby-lang.org/issues/18589
|
[Feature #18589]: https://bugs.ruby-lang.org/issues/18589
|
||||||
|
[Feature #19060]: https://bugs.ruby-lang.org/issues/19060
|
||||||
|
|
|
@ -244,7 +244,27 @@ VALUE rb_fiber_scheduler_io_wait_readable(VALUE scheduler, VALUE io);
|
||||||
VALUE rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io);
|
VALUE rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Nonblocking read from the passed IO.
|
* Non-blocking version of `IO.select`.
|
||||||
|
*
|
||||||
|
* It's possible that this will be emulated using a thread, so you should not
|
||||||
|
* rely on it for high performance.
|
||||||
|
*
|
||||||
|
* @param[in] scheduler Target scheduler.
|
||||||
|
* @param[in] readables An array of readable objects.
|
||||||
|
* @param[in] writables An array of writable objects.
|
||||||
|
* @param[in] exceptables An array of objects that might encounter exceptional conditions.
|
||||||
|
* @param[in] timeout Numeric timeout or nil.
|
||||||
|
* @return What `scheduler.io_select` returns, normally a 3-tuple of arrays of ready objects.
|
||||||
|
*/
|
||||||
|
VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Non-blocking version of `IO.select`, `argv` variant.
|
||||||
|
*/
|
||||||
|
VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Non-blocking read from the passed IO.
|
||||||
*
|
*
|
||||||
* @param[in] scheduler Target scheduler.
|
* @param[in] scheduler Target scheduler.
|
||||||
* @param[out] io An io object to read from.
|
* @param[out] io An io object to read from.
|
||||||
|
|
9
io.c
9
io.c
|
@ -10851,6 +10851,15 @@ rb_io_advise(int argc, VALUE *argv, VALUE io)
|
||||||
static VALUE
|
static VALUE
|
||||||
rb_f_select(int argc, VALUE *argv, VALUE obj)
|
rb_f_select(int argc, VALUE *argv, VALUE obj)
|
||||||
{
|
{
|
||||||
|
VALUE scheduler = rb_fiber_scheduler_current();
|
||||||
|
if (scheduler != Qnil) {
|
||||||
|
// It's optionally supported.
|
||||||
|
VALUE result = rb_fiber_scheduler_io_selectv(scheduler, argc, argv);
|
||||||
|
if (result != Qundef) {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
VALUE timeout;
|
VALUE timeout;
|
||||||
struct select_args args;
|
struct select_args args;
|
||||||
struct timeval timerec;
|
struct timeval timerec;
|
||||||
|
|
21
scheduler.c
21
scheduler.c
|
@ -28,6 +28,7 @@ static ID id_process_wait;
|
||||||
static ID id_io_read, id_io_pread;
|
static ID id_io_read, id_io_pread;
|
||||||
static ID id_io_write, id_io_pwrite;
|
static ID id_io_write, id_io_pwrite;
|
||||||
static ID id_io_wait;
|
static ID id_io_wait;
|
||||||
|
static ID id_io_select;
|
||||||
static ID id_io_close;
|
static ID id_io_close;
|
||||||
|
|
||||||
static ID id_address_resolve;
|
static ID id_address_resolve;
|
||||||
|
@ -51,6 +52,7 @@ Init_Fiber_Scheduler(void)
|
||||||
id_io_pwrite = rb_intern_const("io_pwrite");
|
id_io_pwrite = rb_intern_const("io_pwrite");
|
||||||
|
|
||||||
id_io_wait = rb_intern_const("io_wait");
|
id_io_wait = rb_intern_const("io_wait");
|
||||||
|
id_io_select = rb_intern_const("io_select");
|
||||||
id_io_close = rb_intern_const("io_close");
|
id_io_close = rb_intern_const("io_close");
|
||||||
|
|
||||||
id_address_resolve = rb_intern_const("address_resolve");
|
id_address_resolve = rb_intern_const("address_resolve");
|
||||||
|
@ -231,6 +233,25 @@ rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io)
|
||||||
return rb_fiber_scheduler_io_wait(scheduler, io, RB_UINT2NUM(RUBY_IO_WRITABLE), rb_io_timeout(io));
|
return rb_fiber_scheduler_io_wait(scheduler, io, RB_UINT2NUM(RUBY_IO_WRITABLE), rb_io_timeout(io));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout)
|
||||||
|
{
|
||||||
|
VALUE arguments[] = {
|
||||||
|
readables, writables, exceptables, timeout
|
||||||
|
};
|
||||||
|
|
||||||
|
return rb_fiber_scheduler_io_selectv(scheduler, 4, arguments);
|
||||||
|
}
|
||||||
|
|
||||||
|
VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv)
|
||||||
|
{
|
||||||
|
// I wondered about extracting argv, and checking if there is only a single
|
||||||
|
// IO instance, and instead calling `io_wait`. However, it would require a
|
||||||
|
// decent amount of work and it would be hard to preserve the exact
|
||||||
|
// semantics of IO.select.
|
||||||
|
|
||||||
|
return rb_check_funcall(scheduler, id_io_select, argc, argv);
|
||||||
|
}
|
||||||
|
|
||||||
VALUE
|
VALUE
|
||||||
rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
|
rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
|
||||||
{
|
{
|
||||||
|
|
|
@ -197,6 +197,13 @@ class Scheduler
|
||||||
@writable.delete(io)
|
@writable.delete(io)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def io_select(...)
|
||||||
|
# Emulate the operation using a non-blocking thread:
|
||||||
|
Thread.new do
|
||||||
|
IO.select(...)
|
||||||
|
end.value
|
||||||
|
end
|
||||||
|
|
||||||
# Used for Kernel#sleep and Thread::Mutex#sleep
|
# Used for Kernel#sleep and Thread::Mutex#sleep
|
||||||
def kernel_sleep(duration = nil)
|
def kernel_sleep(duration = nil)
|
||||||
# $stderr.puts [__method__, duration, Fiber.current].inspect
|
# $stderr.puts [__method__, duration, Fiber.current].inspect
|
||||||
|
|
|
@ -172,4 +172,26 @@ class TestFiberIO < Test::Unit::TestCase
|
||||||
assert_predicate(i, :closed?)
|
assert_predicate(i, :closed?)
|
||||||
assert_predicate(o, :closed?)
|
assert_predicate(o, :closed?)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def test_io_select
|
||||||
|
omit "UNIXSocket is not defined!" unless defined?(UNIXSocket)
|
||||||
|
|
||||||
|
UNIXSocket.pair do |r, w|
|
||||||
|
result = nil
|
||||||
|
|
||||||
|
thread = Thread.new do
|
||||||
|
scheduler = Scheduler.new
|
||||||
|
Fiber.set_scheduler scheduler
|
||||||
|
|
||||||
|
Fiber.schedule do
|
||||||
|
w.write("Hello World")
|
||||||
|
result = IO.select([r], [w])
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
thread.join
|
||||||
|
|
||||||
|
assert_equal [[r], [w], []], result
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
Загрузка…
Ссылка в новой задаче