Rename `rb_current_thread_scheduler` to `rb_thread_scheduler_if_nonblocking`.

Correctly capture thread before releasing GVL and pass as argument to
`rb_thread_scheduler_if_nonblocking`.
This commit is contained in:
Samuel Williams 2020-07-18 15:10:17 +12:00
Родитель 9f6a3d0306
Коммит f3462d99a3
4 изменённых файлов: 46 добавлений и 39 удалений

Просмотреть файл

@ -73,7 +73,8 @@ VALUE rb_mutex_synchronize(VALUE mutex, VALUE (*func)(VALUE arg), VALUE arg);
VALUE rb_thread_scheduler_get(VALUE);
VALUE rb_thread_scheduler_set(VALUE, VALUE);
VALUE rb_current_thread_scheduler(void);
VALUE rb_thread_scheduler_if_nonblocking(VALUE thread);
RBIMPL_SYMBOL_EXPORT_END()

65
io.c
Просмотреть файл

@ -1044,6 +1044,7 @@ io_alloc(VALUE klass)
#endif
struct io_internal_read_struct {
VALUE th;
int fd;
int nonblock;
void *buf;
@ -1064,7 +1065,7 @@ struct io_internal_writev_struct {
};
#endif
static int nogvl_wait_for_single_fd(int fd, short events);
static int nogvl_wait_for_single_fd(VALUE th, int fd, short events);
static VALUE
internal_read_func(void *ptr)
{
@ -1075,7 +1076,7 @@ retry:
if (r < 0 && !iis->nonblock) {
int e = errno;
if (e == EAGAIN || e == EWOULDBLOCK) {
if (nogvl_wait_for_single_fd(iis->fd, RB_WAITFD_IN) != -1) {
if (nogvl_wait_for_single_fd(iis->th, iis->fd, RB_WAITFD_IN) != -1) {
goto retry;
}
errno = e;
@ -1118,12 +1119,13 @@ internal_writev_func(void *ptr)
static ssize_t
rb_read_internal(int fd, void *buf, size_t count)
{
struct io_internal_read_struct iis;
iis.fd = fd;
iis.nonblock = 0;
iis.buf = buf;
iis.capa = count;
struct io_internal_read_struct iis = {
.th = rb_thread_current(),
.fd = fd,
.nonblock = 0,
.buf = buf,
.capa = count
};
return (ssize_t)rb_thread_io_blocking_region(internal_read_func, &iis, fd);
}
@ -1131,10 +1133,11 @@ rb_read_internal(int fd, void *buf, size_t count)
static ssize_t
rb_write_internal(int fd, const void *buf, size_t count)
{
struct io_internal_write_struct iis;
iis.fd = fd;
iis.buf = buf;
iis.capa = count;
struct io_internal_write_struct iis = {
.fd = fd,
.buf = buf,
.capa = count
};
return (ssize_t)rb_thread_io_blocking_region(internal_write_func, &iis, fd);
}
@ -1142,10 +1145,11 @@ rb_write_internal(int fd, const void *buf, size_t count)
static ssize_t
rb_write_internal2(int fd, const void *buf, size_t count)
{
struct io_internal_write_struct iis;
iis.fd = fd;
iis.buf = buf;
iis.capa = count;
struct io_internal_write_struct iis = {
.fd = fd,
.buf = buf,
.capa = count
};
return (ssize_t)rb_thread_call_without_gvl2(internal_write_func2, &iis,
RUBY_UBF_IO, NULL);
@ -1155,10 +1159,11 @@ rb_write_internal2(int fd, const void *buf, size_t count)
static ssize_t
rb_writev_internal(int fd, const struct iovec *iov, int iovcnt)
{
struct io_internal_writev_struct iis;
iis.fd = fd;
iis.iov = iov;
iis.iovcnt = iovcnt;
struct io_internal_writev_struct iis = {
.fd = fd,
.iov = iov,
.iovcnt = iovcnt,
};
return (ssize_t)rb_thread_io_blocking_region(internal_writev_func, &iis, fd);
}
@ -1209,8 +1214,7 @@ io_flush_buffer_async2(VALUE arg)
rb_io_t *fptr = (rb_io_t *)arg;
VALUE ret;
ret = (VALUE)rb_thread_call_without_gvl2(io_flush_buffer_sync2, fptr,
RUBY_UBF_IO, NULL);
ret = (VALUE)rb_thread_call_without_gvl2(io_flush_buffer_sync2, fptr, RUBY_UBF_IO, NULL);
if (!ret) {
/* pending async interrupt is there. */
@ -1254,7 +1258,7 @@ io_fflush(rb_io_t *fptr)
int
rb_io_wait_readable(int f)
{
VALUE scheduler = rb_current_thread_scheduler();
VALUE scheduler = rb_thread_scheduler_if_nonblocking(rb_thread_current());
if (scheduler != Qnil) {
VALUE result = rb_funcall(scheduler, rb_intern("wait_readable_fd"), 1, INT2NUM(f));
return RTEST(result);
@ -1284,7 +1288,7 @@ rb_io_wait_readable(int f)
int
rb_io_wait_writable(int f)
{
VALUE scheduler = rb_current_thread_scheduler();
VALUE scheduler = rb_thread_scheduler_if_nonblocking(rb_thread_current());
if (scheduler != Qnil) {
VALUE result = rb_funcall(scheduler, rb_intern("wait_writable_fd"), 1, INT2NUM(f));
return RTEST(result);
@ -2897,6 +2901,7 @@ io_getpartial(int argc, VALUE *argv, VALUE io, int no_exception, int nonblock)
rb_io_set_nonblock(fptr);
}
io_setstrbuf(&str, len);
iis.th = rb_thread_current();
iis.fd = fptr->fd;
iis.nonblock = nonblock;
iis.buf = RSTRING_PTR(str);
@ -10921,9 +10926,9 @@ void * rb_thread_scheduler_wait_for_single_fd(void * _args) {
STATIC_ASSERT(pollin_expected, POLLIN == RB_WAITFD_IN);
STATIC_ASSERT(pollout_expected, POLLOUT == RB_WAITFD_OUT);
static int
nogvl_wait_for_single_fd(int fd, short events)
nogvl_wait_for_single_fd(VALUE th, int fd, short events)
{
VALUE scheduler = rb_current_thread_scheduler();
VALUE scheduler = rb_thread_scheduler_if_nonblocking(th);
if (scheduler != Qnil) {
struct wait_for_single_fd args = {.scheduler = scheduler, .fd = fd, .events = events};
rb_thread_call_with_gvl(rb_thread_scheduler_wait_for_single_fd, &args);
@ -10940,9 +10945,9 @@ nogvl_wait_for_single_fd(int fd, short events)
#else /* !USE_POLL */
# define IOWAIT_SYSCALL "select"
static int
nogvl_wait_for_single_fd(int fd, short events)
nogvl_wait_for_single_fd(VALUE th, int fd, short events)
{
VALUE scheduler = rb_current_thread_scheduler();
VALUE scheduler = rb_thread_scheduler_if_nonblocking(th);
if (scheduler != Qnil) {
struct wait_for_single_fd args = {.scheduler = scheduler, .fd = fd, .events = events};
rb_thread_call_with_gvl(rb_thread_scheduler_wait_for_single_fd, &args);
@ -10981,7 +10986,7 @@ maygvl_copy_stream_wait_read(int has_gvl, struct copy_stream_struct *stp)
ret = rb_wait_for_single_fd(stp->src_fd, RB_WAITFD_IN, NULL);
}
else {
ret = nogvl_wait_for_single_fd(stp->src_fd, RB_WAITFD_IN);
ret = nogvl_wait_for_single_fd(stp->th, stp->src_fd, RB_WAITFD_IN);
}
} while (ret < 0 && maygvl_copy_stream_continue_p(has_gvl, stp));
@ -10999,7 +11004,7 @@ nogvl_copy_stream_wait_write(struct copy_stream_struct *stp)
int ret;
do {
ret = nogvl_wait_for_single_fd(stp->dst_fd, RB_WAITFD_OUT);
ret = nogvl_wait_for_single_fd(stp->th, stp->dst_fd, RB_WAITFD_OUT);
} while (ret < 0 && maygvl_copy_stream_continue_p(0, stp));
if (ret < 0) {

Просмотреть файл

@ -4923,7 +4923,7 @@ static VALUE
rb_f_sleep(int argc, VALUE *argv, VALUE _)
{
time_t beg = time(0);
VALUE scheduler = rb_current_thread_scheduler();
VALUE scheduler = rb_thread_scheduler_if_nonblocking(rb_thread_current());
if (scheduler != Qnil) {
rb_funcallv(scheduler, rb_intern("wait_sleep"), argc, argv);

Просмотреть файл

@ -3621,19 +3621,20 @@ VALUE rb_thread_scheduler_set(VALUE thread, VALUE scheduler)
static VALUE
rb_thread_scheduler(VALUE klass)
{
return rb_current_thread_scheduler();
return rb_thread_scheduler_if_nonblocking(rb_thread_current());
}
VALUE rb_current_thread_scheduler(void)
VALUE rb_thread_scheduler_if_nonblocking(VALUE thread)
{
rb_thread_t * th = GET_THREAD();
rb_thread_t * th = rb_thread_ptr(thread);
VM_ASSERT(th);
if (th->blocking == 0)
if (th->blocking == 0) {
return th->scheduler;
else
} else {
return Qnil;
}
}
static VALUE
@ -4234,7 +4235,7 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
struct waiting_fd wfd;
int state;
VALUE scheduler = rb_current_thread_scheduler();
VALUE scheduler = rb_thread_scheduler_if_nonblocking(rb_thread_current());
if (scheduler != Qnil) {
VALUE result = rb_funcall(scheduler, id_wait_for_single_fd, 3, INT2NUM(fd), INT2NUM(events),
rb_thread_timeout(timeout)
@ -4376,7 +4377,7 @@ select_single_cleanup(VALUE ptr)
int
rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
{
VALUE scheduler = rb_current_thread_scheduler();
VALUE scheduler = rb_thread_scheduler_if_nonblocking(rb_thread_current());
if (scheduler != Qnil) {
VALUE result = rb_funcall(scheduler, id_wait_for_single_fd, 3, INT2NUM(fd), INT2NUM(events),
rb_thread_timeout(timeout)