зеркало из https://github.com/github/ruby.git
* thread.c (rb_thread_io_blocking_region): new function to run
blocking region with GIL released, for fd. * thread.c (rb_thread_fd_close): implement. [ruby-core:35203] git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@30852 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
Родитель
dda8de065c
Коммит
58b325366d
|
@ -1,4 +1,9 @@
|
|||
Sat Feb 12 14:42:11 2011 Nobuyoshi Nakada <nobu@ruby-lang.org>
|
||||
Sat Feb 12 14:44:20 2011 Nobuyoshi Nakada <nobu@ruby-lang.org>
|
||||
|
||||
* thread.c (rb_thread_io_blocking_region): new function to run
|
||||
blocking region with GIL released, for fd.
|
||||
|
||||
* thread.c (rb_thread_fd_close): implement. [ruby-core:35203]
|
||||
|
||||
* vm.c (th_init): rename from th_init2.
|
||||
|
||||
|
|
|
@ -559,7 +559,7 @@ rsock_bsock_send(int argc, VALUE *argv, VALUE sock)
|
|||
arg.fd = fptr->fd;
|
||||
arg.flags = NUM2INT(flags);
|
||||
while (rb_thread_fd_writable(arg.fd),
|
||||
(n = (int)BLOCKING_REGION(func, &arg)) < 0) {
|
||||
(n = (int)BLOCKING_REGION_FD(func, &arg)) < 0) {
|
||||
if (rb_io_wait_writable(arg.fd)) {
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -129,7 +129,7 @@ rsock_s_recvfrom(VALUE sock, int argc, VALUE *argv, enum sock_recv_type from)
|
|||
|
||||
while (rb_io_check_closed(fptr),
|
||||
rb_thread_wait_fd(arg.fd),
|
||||
(slen = BLOCKING_REGION(recvfrom_blocking, &arg)) < 0) {
|
||||
(slen = BLOCKING_REGION_FD(recvfrom_blocking, &arg)) < 0) {
|
||||
if (!rb_io_wait_readable(fptr->fd)) {
|
||||
rb_sys_fail("recvfrom(2)");
|
||||
}
|
||||
|
@ -380,7 +380,7 @@ rsock_connect(int fd, const struct sockaddr *sockaddr, int len, int socks)
|
|||
if (socks) func = socks_connect_blocking;
|
||||
#endif
|
||||
for (;;) {
|
||||
status = (int)BLOCKING_REGION(func, &arg);
|
||||
status = (int)BLOCKING_REGION_FD(func, &arg);
|
||||
if (status < 0) {
|
||||
switch (errno) {
|
||||
case EAGAIN:
|
||||
|
@ -515,7 +515,7 @@ rsock_s_accept(VALUE klass, int fd, struct sockaddr *sockaddr, socklen_t *len)
|
|||
arg.len = len;
|
||||
retry:
|
||||
rb_thread_wait_fd(fd);
|
||||
fd2 = (int)BLOCKING_REGION(accept_blocking, &arg);
|
||||
fd2 = (int)BLOCKING_REGION_FD(accept_blocking, &arg);
|
||||
if (fd2 < 0) {
|
||||
switch (errno) {
|
||||
case EMFILE:
|
||||
|
|
|
@ -197,6 +197,7 @@ int Rconnect();
|
|||
#include "constdefs.h"
|
||||
|
||||
#define BLOCKING_REGION(func, arg) (long)rb_thread_blocking_region((func), (arg), RUBY_UBF_IO, 0)
|
||||
#define BLOCKING_REGION_FD(func, arg) (long)rb_thread_io_blocking_region((func), (arg), (arg)->fd)
|
||||
|
||||
#define SockAddrStringValue(v) rsock_sockaddr_string_value(&(v))
|
||||
#define SockAddrStringValuePtr(v) rsock_sockaddr_string_value_ptr(&(v))
|
||||
|
|
|
@ -177,7 +177,7 @@ udp_send(int argc, VALUE *argv, VALUE sock)
|
|||
arg.to = res->ai_addr;
|
||||
arg.tolen = res->ai_addrlen;
|
||||
rb_thread_fd_writable(arg.fd);
|
||||
n = (int)BLOCKING_REGION(rsock_sendto_blocking, &arg);
|
||||
n = (int)BLOCKING_REGION_FD(rsock_sendto_blocking, &arg);
|
||||
if (n >= 0) {
|
||||
freeaddrinfo(res0);
|
||||
return INT2FIX(n);
|
||||
|
|
|
@ -249,7 +249,7 @@ unix_send_io(VALUE sock, VALUE val)
|
|||
|
||||
arg.fd = fptr->fd;
|
||||
rb_thread_fd_writable(arg.fd);
|
||||
if ((int)BLOCKING_REGION(sendmsg_blocking, &arg) == -1)
|
||||
if ((int)BLOCKING_REGION_FD(sendmsg_blocking, &arg) == -1)
|
||||
rb_sys_fail("sendmsg(2)");
|
||||
|
||||
return Qnil;
|
||||
|
@ -335,7 +335,7 @@ unix_recv_io(int argc, VALUE *argv, VALUE sock)
|
|||
|
||||
arg.fd = fptr->fd;
|
||||
rb_thread_wait_fd(arg.fd);
|
||||
if ((int)BLOCKING_REGION(recvmsg_blocking, &arg) == -1)
|
||||
if ((int)BLOCKING_REGION_FD(recvmsg_blocking, &arg) == -1)
|
||||
rb_sys_fail("recvmsg(2)");
|
||||
|
||||
#if FD_PASSING_BY_MSG_CONTROL
|
||||
|
|
|
@ -797,6 +797,7 @@ void rb_thread_check_ints(void);
|
|||
int rb_thread_interrupted(VALUE thval);
|
||||
VALUE rb_thread_blocking_region(rb_blocking_function_t *func, void *data1,
|
||||
rb_unblock_function_t *ubf, void *data2);
|
||||
VALUE rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd);
|
||||
#define RUBY_UBF_IO ((rb_unblock_function_t *)-1)
|
||||
#define RUBY_UBF_PROCESS ((rb_unblock_function_t *)-1)
|
||||
VALUE rb_mutex_new(void);
|
||||
|
|
9
io.c
9
io.c
|
@ -605,7 +605,7 @@ rb_read_internal(int fd, void *buf, size_t count)
|
|||
iis.buf = buf;
|
||||
iis.capa = count;
|
||||
|
||||
return (ssize_t)rb_thread_blocking_region(internal_read_func, &iis, RUBY_UBF_IO, 0);
|
||||
return (ssize_t)rb_thread_io_blocking_region(internal_read_func, &iis, fd);
|
||||
}
|
||||
|
||||
static ssize_t
|
||||
|
@ -616,7 +616,7 @@ rb_write_internal(int fd, const void *buf, size_t count)
|
|||
iis.buf = buf;
|
||||
iis.capa = count;
|
||||
|
||||
return (ssize_t)rb_thread_blocking_region(internal_write_func, &iis, RUBY_UBF_IO, 0);
|
||||
return (ssize_t)rb_thread_io_blocking_region(internal_write_func, &iis, fd);
|
||||
}
|
||||
|
||||
static long
|
||||
|
@ -653,7 +653,8 @@ io_flush_buffer_sync(void *arg)
|
|||
static VALUE
|
||||
io_flush_buffer_async(VALUE arg)
|
||||
{
|
||||
return rb_thread_blocking_region(io_flush_buffer_sync, (void *)arg, RUBY_UBF_IO, 0);
|
||||
rb_io_t *fptr = (rb_io_t *)arg;
|
||||
return rb_thread_io_blocking_region(io_flush_buffer_sync, fptr, fptr->fd);
|
||||
}
|
||||
|
||||
static inline int
|
||||
|
@ -7475,7 +7476,7 @@ do_io_advise(rb_io_t *fptr, VALUE advice, off_t offset, off_t len)
|
|||
ias.offset = offset;
|
||||
ias.len = len;
|
||||
|
||||
if (rv = (int)rb_thread_blocking_region(io_advise_internal, &ias, RUBY_UBF_IO, 0))
|
||||
if (rv = (int)rb_thread_io_blocking_region(io_advise_internal, &ias, fptr->fd))
|
||||
/* posix_fadvise(2) doesn't set errno. On success it returns 0; otherwise
|
||||
it returns the error code. */
|
||||
rb_syserr_fail(rv, RSTRING_PTR(fptr->pathv));
|
||||
|
|
|
@ -410,4 +410,24 @@ class TestSocket < Test::Unit::TestCase
|
|||
assert_equal(stamp.data[-8,8].unpack("Q")[0], t.subsec * 2**64)
|
||||
end
|
||||
|
||||
def test_closed_read
|
||||
require 'timeout'
|
||||
require 'socket'
|
||||
bug4390 = '[ruby-core:35203]'
|
||||
server = TCPServer.new("localhost", 0)
|
||||
serv_thread = Thread.new {server.accept}
|
||||
begin sleep(0.1) end until serv_thread.stop?
|
||||
sock = TCPSocket.new("localhost", server.addr[1])
|
||||
client_thread = Thread.new do
|
||||
sock.readline
|
||||
end
|
||||
begin sleep(0.1) end until client_thread.stop?
|
||||
Timeout.timeout(1) do
|
||||
sock.close
|
||||
sock = nil
|
||||
assert_raise(IOError, bug4390) {client_thread.join}
|
||||
end
|
||||
ensure
|
||||
server.close
|
||||
end
|
||||
end if defined?(Socket)
|
||||
|
|
52
thread.c
52
thread.c
|
@ -73,6 +73,8 @@ static const VALUE eKillSignal = INT2FIX(0);
|
|||
static const VALUE eTerminateSignal = INT2FIX(1);
|
||||
static volatile int system_working = 1;
|
||||
|
||||
#define closed_stream_error GET_VM()->special_exceptions[ruby_error_closed_stream]
|
||||
|
||||
inline static void
|
||||
st_delete_wrap(st_table *table, st_data_t key)
|
||||
{
|
||||
|
@ -1122,6 +1124,7 @@ rb_thread_blocking_region(
|
|||
rb_thread_t *th = GET_THREAD();
|
||||
int saved_errno = 0;
|
||||
|
||||
th->waiting_fd = -1;
|
||||
if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) {
|
||||
ubf = ubf_select;
|
||||
data2 = th;
|
||||
|
@ -1136,6 +1139,23 @@ rb_thread_blocking_region(
|
|||
return val;
|
||||
}
|
||||
|
||||
VALUE
|
||||
rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
|
||||
{
|
||||
VALUE val;
|
||||
rb_thread_t *th = GET_THREAD();
|
||||
int saved_errno = 0;
|
||||
|
||||
th->waiting_fd = fd;
|
||||
BLOCKING_REGION({
|
||||
val = func(data1);
|
||||
saved_errno = errno;
|
||||
}, ubf_select, th);
|
||||
errno = saved_errno;
|
||||
|
||||
return val;
|
||||
}
|
||||
|
||||
/* alias of rb_thread_blocking_region() */
|
||||
|
||||
VALUE
|
||||
|
@ -1427,10 +1447,36 @@ rb_threadptr_reset_raised(rb_thread_t *th)
|
|||
return 1;
|
||||
}
|
||||
|
||||
#define THREAD_IO_WAITING_P(th) ( \
|
||||
((th)->status == THREAD_STOPPED || \
|
||||
(th)->status == THREAD_STOPPED_FOREVER) && \
|
||||
(th)->blocking_region_buffer && \
|
||||
(th)->unblock.func == ubf_select && \
|
||||
1)
|
||||
|
||||
static int
|
||||
thread_fd_close_i(st_data_t key, st_data_t val, st_data_t data)
|
||||
{
|
||||
int fd = (int)data;
|
||||
rb_thread_t *th;
|
||||
GetThreadPtr((VALUE)key, th);
|
||||
|
||||
if (THREAD_IO_WAITING_P(th)) {
|
||||
native_mutex_lock(&th->interrupt_lock);
|
||||
if (THREAD_IO_WAITING_P(th) && th->waiting_fd == fd) {
|
||||
th->errinfo = th->vm->special_exceptions[ruby_error_closed_stream];
|
||||
RUBY_VM_SET_INTERRUPT(th);
|
||||
(th->unblock.func)(th->unblock.arg);
|
||||
}
|
||||
native_mutex_unlock(&th->interrupt_lock);
|
||||
}
|
||||
return ST_CONTINUE;
|
||||
}
|
||||
|
||||
void
|
||||
rb_thread_fd_close(int fd)
|
||||
{
|
||||
/* TODO: fix me */
|
||||
st_foreach(GET_THREAD()->vm->living_threads, thread_fd_close_i, (st_index_t)fd);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -4362,6 +4408,10 @@ Init_Thread(void)
|
|||
|
||||
rb_define_method(rb_cThread, "inspect", rb_thread_inspect, 0);
|
||||
|
||||
closed_stream_error = rb_exc_new2(rb_eIOError, "stream closed");
|
||||
OBJ_TAINT(closed_stream_error);
|
||||
OBJ_FREEZE(closed_stream_error);
|
||||
|
||||
cThGroup = rb_define_class("ThreadGroup", rb_cObject);
|
||||
rb_define_alloc_func(cThGroup, thgroup_s_alloc);
|
||||
rb_define_method(cThGroup, "list", thgroup_list, 0);
|
||||
|
|
|
@ -260,6 +260,7 @@ enum ruby_special_exceptions {
|
|||
ruby_error_reenter,
|
||||
ruby_error_nomemory,
|
||||
ruby_error_sysstack,
|
||||
ruby_error_closed_stream,
|
||||
ruby_special_error_count
|
||||
};
|
||||
|
||||
|
@ -395,6 +396,8 @@ typedef struct rb_thread_struct {
|
|||
/* passing state */
|
||||
int state;
|
||||
|
||||
int waiting_fd;
|
||||
|
||||
/* for rb_iterate */
|
||||
const rb_block_t *passed_block;
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче