Support `IO#pread` / `IO#pwrite` using fiber scheduler. (#7594)

* Skip test if non-blocking file IO is not supported.
This commit is contained in:
Samuel Williams 2023-03-31 00:48:55 +13:00 коммит произвёл GitHub
Родитель 6f122965cf
Коммит 648870b5c5
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 264 добавлений и 60 удалений

Просмотреть файл

@ -267,10 +267,10 @@ VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv);
* Non-blocking read from the passed IO.
*
* @param[in] scheduler Target scheduler.
* @param[out] io An io object to read from.
* @param[out] buffer Return buffer.
* @param[in] length Requested number of bytes to read.
* @param[in] offset The offset in the buffer to read to.
* @param[in] io An io object to read from.
* @param[in] buffer The buffer to read to.
* @param[in] length The minimum number of bytes to read.
* @param[in] offset The offset in the buffer to read from.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_read`.
* @return otherwise What `scheduler.io_read` returns `[-errno, size]`.
*/
@ -280,9 +280,9 @@ VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t
* Non-blocking write to the passed IO.
*
* @param[in] scheduler Target scheduler.
* @param[out] io An io object to write to.
* @param[in] buffer What to write.
* @param[in] length Number of bytes to write.
* @param[in] io An io object to write to.
* @param[in] buffer The buffer to write from.
* @param[in] length The minimum number of bytes to write.
* @param[in] offset The offset in the buffer to write from.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_write`.
* @return otherwise What `scheduler.io_write` returns `[-errno, size]`.
@ -293,10 +293,10 @@ VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_
* Non-blocking read from the passed IO at the specified offset.
*
* @param[in] scheduler Target scheduler.
* @param[out] io An io object to read from.
* @param[in] from The offset in the given IO to read the data from.
* @param[out] buffer The buffer to read the data to.
* @param[in] length Requested number of bytes to read.
* @param[in] io An io object to read from.
* @param[in] from The offset to read from.
* @param[in] buffer The buffer to read to.
* @param[in] length The minimum number of bytes to read.
* @param[in] offset The offset in the buffer to read to.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_read`.
* @return otherwise What `scheduler.io_read` returns.
@ -307,10 +307,10 @@ VALUE rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALU
* Non-blocking write to the passed IO at the specified offset.
*
* @param[in] scheduler Target scheduler.
* @param[out] io An io object to write to.
* @param[in] from The offset in the given IO to write the data to.
* @param[in] buffer The buffer to write the data from.
* @param[in] length Number of bytes to write.
* @param[in] io An io object to write to.
* @param[in] from The offset to write to.
* @param[in] buffer The buffer to write from.
* @param[in] length The minimum number of bytes to write.
* @param[in] offset The offset in the buffer to write from.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_write`.
* @return otherwise What `scheduler.io_write` returns.
@ -321,27 +321,55 @@ VALUE rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VAL
* Non-blocking read from the passed IO using a native buffer.
*
* @param[in] scheduler Target scheduler.
* @param[out] io An io object to read from.
* @param[out] buffer Return buffer.
* @param[in] size Size of the return buffer.
* @param[in] length Requested number of bytes to read.
* @param[in] io An io object to read from.
* @param[in] base The memory to read to.
* @param[in] size Size of the memory.
* @param[in] length The minimum number of bytes to read.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_read`.
* @return otherwise What `scheduler.io_read` returns.
*/
VALUE rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *buffer, size_t size, size_t length);
VALUE rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t size, size_t length);
/**
* Non-blocking write to the passed IO using a native buffer.
*
* @param[in] scheduler Target scheduler.
* @param[out] io An io object to write to.
* @param[in] buffer What to write.
* @param[in] size Size of the buffer.
* @param[in] length Number of bytes to write.
* @param[in] io An io object to write to.
* @param[in] base The memory to write from.
* @param[in] size Size of the memory.
* @param[in] length The minimum number of bytes to write.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_write`.
* @return otherwise What `scheduler.io_write` returns.
*/
VALUE rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *buffer, size_t size, size_t length);
VALUE rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *base, size_t size, size_t length);
/**
* Non-blocking pread from the passed IO using a native buffer.
*
* @param[in] scheduler Target scheduler.
* @param[in] io An io object to read from.
* @param[in] from The offset to read from.
* @param[in] base The memory to read to.
* @param[in] size Size of the memory.
* @param[in] length The minimum number of bytes to read.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_read`.
* @return otherwise What `scheduler.io_read` returns.
*/
VALUE rb_fiber_scheduler_io_pread_memory(VALUE scheduler, VALUE io, rb_off_t from, void *base, size_t size, size_t length);
/**
* Non-blocking pwrite to the passed IO using a native buffer.
*
* @param[in] scheduler Target scheduler.
* @param[in] io An io object to write to.
* @param[in] from The offset to write from.
* @param[in] base The memory to write from.
* @param[in] size Size of the memory.
* @param[in] length The minimum number of bytes to write.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_write`.
* @return otherwise What `scheduler.io_write` returns.
*/
VALUE rb_fiber_scheduler_io_pwrite_memory(VALUE scheduler, VALUE io, rb_off_t from, const void *base, size_t size, size_t length);
/**
* Non-blocking close the given IO.

Просмотреть файл

@ -75,7 +75,9 @@ VALUE rb_io_buffer_map(VALUE io, size_t size, rb_off_t offset, enum rb_io_buffer
VALUE rb_io_buffer_lock(VALUE self);
VALUE rb_io_buffer_unlock(VALUE self);
int rb_io_buffer_try_unlock(VALUE self);
VALUE rb_io_buffer_free(VALUE self);
VALUE rb_io_buffer_free_locked(VALUE self);
int rb_io_buffer_get_bytes(VALUE self, void **base, size_t *size);
void rb_io_buffer_get_bytes_for_reading(VALUE self, const void **base, size_t *size);

42
io.c
Просмотреть файл

@ -6066,6 +6066,7 @@ rb_io_sysread(int argc, VALUE *argv, VALUE io)
#if defined(HAVE_PREAD) || defined(HAVE_PWRITE)
struct prdwr_internal_arg {
VALUE io;
int fd;
void *buf;
size_t count;
@ -6075,17 +6076,28 @@ struct prdwr_internal_arg {
#if defined(HAVE_PREAD)
static VALUE
internal_pread_func(void *arg)
internal_pread_func(void *_arg)
{
struct prdwr_internal_arg *p = arg;
return (VALUE)pread(p->fd, p->buf, p->count, p->offset);
struct prdwr_internal_arg *arg = _arg;
return (VALUE)pread(arg->fd, arg->buf, arg->count, arg->offset);
}
static VALUE
pread_internal_call(VALUE arg)
pread_internal_call(VALUE _arg)
{
struct prdwr_internal_arg *p = (struct prdwr_internal_arg *)arg;
return rb_thread_io_blocking_region(internal_pread_func, p, p->fd);
struct prdwr_internal_arg *arg = (struct prdwr_internal_arg *)_arg;
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
VALUE result = rb_fiber_scheduler_io_pread_memory(scheduler, arg->io, arg->offset, arg->buf, arg->count, 0);
if (!UNDEF_P(result)) {
return rb_fiber_scheduler_io_result_apply(result);
}
}
return rb_thread_io_blocking_region(internal_pread_func, arg, arg->fd);
}
/*
@ -6122,7 +6134,7 @@ rb_io_pread(int argc, VALUE *argv, VALUE io)
VALUE len, offset, str;
rb_io_t *fptr;
ssize_t n;
struct prdwr_internal_arg arg;
struct prdwr_internal_arg arg = {.io = io};
int shrinkable;
rb_scan_args(argc, argv, "21", &len, &offset, &str);
@ -6158,9 +6170,19 @@ rb_io_pread(int argc, VALUE *argv, VALUE io)
#if defined(HAVE_PWRITE)
static VALUE
internal_pwrite_func(void *ptr)
internal_pwrite_func(void *_arg)
{
struct prdwr_internal_arg *arg = ptr;
struct prdwr_internal_arg *arg = _arg;
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
VALUE result = rb_fiber_scheduler_io_pwrite_memory(scheduler, arg->io, arg->offset, arg->buf, arg->count, 0);
if (!UNDEF_P(result)) {
return rb_fiber_scheduler_io_result_apply(result);
}
}
return (VALUE)pwrite(arg->fd, arg->buf, arg->count, arg->offset);
}
@ -6195,7 +6217,7 @@ rb_io_pwrite(VALUE io, VALUE str, VALUE offset)
{
rb_io_t *fptr;
ssize_t n;
struct prdwr_internal_arg arg;
struct prdwr_internal_arg arg = {.io = io};
VALUE tmp;
if (!RB_TYPE_P(str, T_STRING))

Просмотреть файл

@ -1001,17 +1001,23 @@ rb_io_buffer_lock(VALUE self)
return self;
}
static void
io_buffer_unlock(struct rb_io_buffer *data)
{
if (!(data->flags & RB_IO_BUFFER_LOCKED)) {
rb_raise(rb_eIOBufferLockedError, "Buffer not locked!");
}
data->flags &= ~RB_IO_BUFFER_LOCKED;
}
VALUE
rb_io_buffer_unlock(VALUE self)
{
struct rb_io_buffer *data = NULL;
TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data);
if (!(data->flags & RB_IO_BUFFER_LOCKED)) {
rb_raise(rb_eIOBufferLockedError, "Buffer not locked!");
}
data->flags &= ~RB_IO_BUFFER_LOCKED;
io_buffer_unlock(data);
return self;
}
@ -1123,6 +1129,17 @@ rb_io_buffer_free(VALUE self)
return self;
}
VALUE rb_io_buffer_free_locked(VALUE self)
{
struct rb_io_buffer *data = NULL;
TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data);
io_buffer_unlock(data);
io_buffer_free(data);
return self;
}
// Validate that access to the buffer is within bounds, assuming you want to
// access length bytes from the specified offset.
static inline void

Просмотреть файл

@ -458,15 +458,15 @@ VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv)
/*
* Document-method: Fiber::Scheduler#io_read
* call-seq: io_read(io, buffer, minimum_length) -> read length or -errno
* call-seq: io_read(io, buffer, length) -> read length or -errno
*
* Invoked by IO#read or IO#Buffer.read to read +length+ bytes from +io+ into a
* specified +buffer+ (see IO::Buffer).
* specified +buffer+ (see IO::Buffer) at the given +offset+.
*
* The +minimum_length+ argument is the "minimum length to be read". If the IO
* buffer size is 8KiB, but the +length+ is +1024+ (1KiB), up to 8KiB might be
* read, but at least 1KiB will be. Generally, the only case where less data
* than +length+ will be read is if there is an error reading the data.
* The +length+ argument is the "minimum length to be read". If the IO buffer
* size is 8KiB, but the +length+ is +1024+ (1KiB), up to 8KiB might be read,
* but at least 1KiB will be. Generally, the only case where less data than
* +length+ will be read is if there is an error reading the data.
*
* Specifying a +length+ of 0 is valid and means try reading at least once and
* return any available data.
@ -492,13 +492,19 @@ rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t lengt
return rb_check_funcall(scheduler, id_io_read, 4, arguments);
}
/*
* Document-method: Fiber::Scheduler#io_read
* call-seq: io_pread(io, buffer, from, length, offset) -> read length or -errno
*
* Invoked by IO::Buffer#pread. See that method for description of arguments.
* Invoked by IO#pread or IO::Buffer#pread to read +length+ bytes from +io+
* at offset +from+ into a specified +buffer+ (see IO::Buffer) at the given
* +offset+.
*
* This method is semantically the same as #io_read, but it allows to specify
* the offset to read from and is often better for asynchronous IO on the same
* file.
*
* The method should be considered _experimental_.
*/
VALUE
rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
@ -512,16 +518,16 @@ rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buff
/*
* Document-method: Scheduler#io_write
* call-seq: io_write(io, buffer, minimum_length) -> written length or -errno
* call-seq: io_write(io, buffer, length) -> written length or -errno
*
* Invoked by IO#write or IO::Buffer#write to write +length+ bytes to +io+ from
* from a specified +buffer+ (see IO::Buffer).
* from a specified +buffer+ (see IO::Buffer) at the given +offset+.
*
* The +minimum_length+ argument is the "minimum length to be written". If the
* IO buffer size is 8KiB, but the +length+ specified is 1024 (1KiB), at most
* 8KiB will be written, but at least 1KiB will be. Generally, the only case
* where less data than +minimum_length+ will be written is if there is an
* error writing the data.
* The +length+ argument is the "minimum length to be written". If the IO
* buffer size is 8KiB, but the +length+ specified is 1024 (1KiB), at most 8KiB
* will be written, but at least 1KiB will be. Generally, the only case where
* less data than +length+ will be written is if there is an error writing the
* data.
*
* Specifying a +length+ of 0 is valid and means try writing at least once, as
* much data as possible.
@ -552,7 +558,15 @@ rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t leng
* Document-method: Fiber::Scheduler#io_pwrite
* call-seq: io_pwrite(io, buffer, from, length, offset) -> written length or -errno
*
* Invoked by IO::Buffer#pwrite. See that method for description of arguments.
* Invoked by IO#pwrite or IO::Buffer#pwrite to write +length+ bytes to +io+
* at offset +from+ into a specified +buffer+ (see IO::Buffer) at the given
* +offset+.
*
* This method is semantically the same as #io_write, but it allows to specify
* the offset to write to and is often better for asynchronous IO on the same
* file.
*
* The method should be considered _experimental_.
*
*/
VALUE
@ -572,8 +586,7 @@ rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t
VALUE result = rb_fiber_scheduler_io_read(scheduler, io, buffer, length, 0);
rb_io_buffer_unlock(buffer);
rb_io_buffer_free(buffer);
rb_io_buffer_free_locked(buffer);
return result;
}
@ -585,8 +598,31 @@ rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *base,
VALUE result = rb_fiber_scheduler_io_write(scheduler, io, buffer, length, 0);
rb_io_buffer_unlock(buffer);
rb_io_buffer_free(buffer);
rb_io_buffer_free_locked(buffer);
return result;
}
VALUE
rb_fiber_scheduler_io_pread_memory(VALUE scheduler, VALUE io, rb_off_t from, void *base, size_t size, size_t length)
{
VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED);
VALUE result = rb_fiber_scheduler_io_pread(scheduler, io, from, buffer, length, 0);
rb_io_buffer_free_locked(buffer);
return result;
}
VALUE
rb_fiber_scheduler_io_pwrite_memory(VALUE scheduler, VALUE io, rb_off_t from, const void *base, size_t size, size_t length)
{
VALUE buffer = rb_io_buffer_new((void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_READONLY);
VALUE result = rb_fiber_scheduler_io_pwrite(scheduler, io, from, buffer, length, 0);
rb_io_buffer_free_locked(buffer);
return result;
}

Просмотреть файл

@ -366,6 +366,64 @@ class IOBufferScheduler < Scheduler
return total
end
def io_pread(io, buffer, from, length, offset)
total = 0
io.nonblock = true
while true
maximum_size = buffer.size - offset
result = blocking{buffer.pread(io, from, maximum_size, offset)}
if result > 0
total += result
offset += result
from += result
break if total >= length
elsif result == 0
break
elsif result == EAGAIN
if length > 0
self.io_wait(io, IO::READABLE, nil)
else
return result
end
elsif result < 0
return result
end
end
return total
end
def io_pwrite(io, buffer, from, length, offset)
total = 0
io.nonblock = true
while true
maximum_size = buffer.size - offset
result = blocking{buffer.pwrite(io, from, maximum_size, offset)}
if result > 0
total += result
offset += result
from += result
break if total >= length
elsif result == 0
break
elsif result == EAGAIN
if length > 0
self.io_wait(io, IO::WRITABLE, nil)
else
return result
end
elsif result < 0
return result
end
end
return total
end
def blocking(&block)
Fiber.blocking(&block)
end

Просмотреть файл

@ -155,4 +155,45 @@ class TestFiberIOBuffer < Test::Unit::TestCase
i&.close
o&.close
end
def nonblockable?(io)
io.nonblock{}
true
rescue
false
end
def test_io_buffer_pread_pwrite
file = Tempfile.new("test_io_buffer_pread_pwrite")
omit "Non-blocking file IO is not supported" unless nonblockable?(file)
source_buffer = IO::Buffer.for("Hello World!")
destination_buffer = IO::Buffer.new(source_buffer.size)
# Test non-scheduler code path:
source_buffer.pwrite(file, 1, source_buffer.size)
destination_buffer.pread(file, 1, source_buffer.size)
assert_equal source_buffer, destination_buffer
# Test scheduler code path:
destination_buffer.clear
file.truncate(0)
thread = Thread.new do
scheduler = IOBufferScheduler.new
Fiber.set_scheduler scheduler
Fiber.schedule do
source_buffer.pwrite(file, 1, source_buffer.size)
destination_buffer.pread(file, 1, source_buffer.size)
end
end
thread.join
assert_equal source_buffer, destination_buffer
ensure
file&.close!
end
end