Direct io for accept, send, sendmsg, recvfrom, and related methods.

This commit is contained in:
Samuel Williams 2021-05-09 00:13:47 +12:00
Родитель ff609eee98
Коммит 3deb5d7113
8 изменённых файлов: 124 добавлений и 87 удалений

Просмотреть файл

@ -566,7 +566,7 @@ rsock_bsock_send(int argc, VALUE *argv, VALUE sock)
arg.flags = NUM2INT(flags);
while (rsock_maybe_fd_writable(arg.fd),
(n = (ssize_t)BLOCKING_REGION_FD(func, &arg)) < 0) {
if (rb_io_wait_writable(arg.fd)) {
if (rb_io_maybe_wait_writable(errno, sock, Qnil)) {
continue;
}
rb_sys_fail(funcname);

Просмотреть файл

@ -166,7 +166,7 @@ recvfrom_locktmp(VALUE v)
}
VALUE
rsock_s_recvfrom(VALUE sock, int argc, VALUE *argv, enum sock_recv_type from)
rsock_s_recvfrom(VALUE socket, int argc, VALUE *argv, enum sock_recv_type from)
{
rb_io_t *fptr;
VALUE str;
@ -177,27 +177,35 @@ rsock_s_recvfrom(VALUE sock, int argc, VALUE *argv, enum sock_recv_type from)
rb_scan_args(argc, argv, "12", &len, &flg, &str);
if (flg == Qnil) arg.flags = 0;
else arg.flags = NUM2INT(flg);
if (flg == Qnil)
arg.flags = 0;
else
arg.flags = NUM2INT(flg);
buflen = NUM2INT(len);
str = rsock_strbuf(str, buflen);
GetOpenFile(sock, fptr);
RB_IO_POINTER(socket, fptr);
if (rb_io_read_pending(fptr)) {
rb_raise(rb_eIOError, "recv for buffered IO");
rb_raise(rb_eIOError, "recv for buffered IO");
}
arg.fd = fptr->fd;
arg.alen = (socklen_t)sizeof(arg.buf);
arg.str = str;
arg.length = buflen;
while (rb_io_check_closed(fptr),
rsock_maybe_wait_fd(arg.fd),
(slen = (long)rb_str_locktmp_ensure(str, recvfrom_locktmp,
(VALUE)&arg)) < 0) {
if (!rb_io_wait_readable(fptr->fd)) {
while (true) {
rb_io_check_closed(fptr);
rsock_maybe_wait_fd(arg.fd);
slen = (long)rb_str_locktmp_ensure(str, recvfrom_locktmp, (VALUE)&arg);
if (slen >= 0) break;
if (!rb_io_maybe_wait_readable(errno, socket, Qnil))
rb_sys_fail("recvfrom(2)");
}
}
/* Resize the string to the amount of data received */
@ -221,7 +229,7 @@ rsock_s_recvfrom(VALUE sock, int argc, VALUE *argv, enum sock_recv_type from)
return rb_assoc_new(str, rsock_unixaddr(&arg.buf.un, arg.alen));
#endif
case RECV_SOCKET:
return rb_assoc_new(str, rsock_io_socket_addrinfo(sock, &arg.buf.addr, arg.alen));
return rb_assoc_new(str, rsock_io_socket_addrinfo(socket, &arg.buf.addr, arg.alen));
default:
rb_bug("rsock_s_recvfrom called with bad value");
}
@ -682,38 +690,47 @@ accept_blocking(void *data)
}
VALUE
rsock_s_accept(VALUE klass, int fd, struct sockaddr *sockaddr, socklen_t *len)
rsock_s_accept(VALUE klass, VALUE io, struct sockaddr *sockaddr, socklen_t *len)
{
int fd2;
int retry = 0;
struct accept_arg arg;
rb_io_t *fptr = NULL;
RB_IO_POINTER(io, fptr);
struct accept_arg accept_arg = {
.fd = fptr->fd,
.sockaddr = sockaddr,
.len = len
};
int retry = 0;
arg.fd = fd;
arg.sockaddr = sockaddr;
arg.len = len;
retry:
rsock_maybe_wait_fd(fd);
fd2 = (int)BLOCKING_REGION_FD(accept_blocking, &arg);
if (fd2 < 0) {
int e = errno;
switch (e) {
case EMFILE:
case ENFILE:
case ENOMEM:
if (retry) break;
rb_gc();
retry = 1;
goto retry;
default:
if (!rb_io_wait_readable(fd)) break;
retry = 0;
goto retry;
}
rb_syserr_fail(e, "accept(2)");
rsock_maybe_wait_fd(accept_arg.fd);
int peer = (int)BLOCKING_REGION_FD(accept_blocking, &accept_arg);
if (peer < 0) {
int error = errno;
switch (error) {
case EMFILE:
case ENFILE:
case ENOMEM:
if (retry) break;
rb_gc();
retry = 1;
goto retry;
default:
if (!rb_io_maybe_wait_readable(error, io, Qnil)) break;
retry = 0;
goto retry;
}
rb_syserr_fail(error, "accept(2)");
}
rb_update_max_fd(fd2);
if (!klass) return INT2NUM(fd2);
return rsock_init_sock(rb_obj_alloc(klass), fd2);
rb_update_max_fd(peer);
if (!klass) return INT2NUM(peer);
return rsock_init_sock(rb_obj_alloc(klass), peer);
}
int

Просмотреть файл

@ -373,7 +373,7 @@ VALUE rsock_s_recvfrom(VALUE sock, int argc, VALUE *argv, enum sock_recv_type fr
int rsock_connect(int fd, const struct sockaddr *sockaddr, int len, int socks, struct timeval *timeout);
VALUE rsock_s_accept(VALUE klass, int fd, struct sockaddr *sockaddr, socklen_t *len);
VALUE rsock_s_accept(VALUE klass, VALUE io, struct sockaddr *sockaddr, socklen_t *len);
VALUE rsock_s_accept_nonblock(VALUE klass, VALUE ex, rb_io_t *fptr,
struct sockaddr *sockaddr, socklen_t *len);
VALUE rsock_sock_listen(VALUE sock, VALUE log);

Просмотреть файл

@ -750,17 +750,14 @@ sock_recvfrom_nonblock(VALUE sock, VALUE len, VALUE flg, VALUE str, VALUE ex)
*
*/
static VALUE
sock_accept(VALUE sock)
sock_accept(VALUE server)
{
rb_io_t *fptr;
VALUE sock2;
union_sockaddr buf;
socklen_t len = (socklen_t)sizeof buf;
union_sockaddr buffer;
socklen_t length = (socklen_t)sizeof(buffer);
GetOpenFile(sock, fptr);
sock2 = rsock_s_accept(rb_cSocket,fptr->fd,&buf.addr,&len);
VALUE peer = rsock_s_accept(rb_cSocket, server, &buffer.addr, &length);
return rb_assoc_new(sock2, rsock_io_socket_addrinfo(sock2, &buf.addr, len));
return rb_assoc_new(peer, rsock_io_socket_addrinfo(peer, &buffer.addr, length));
}
/* :nodoc: */
@ -820,17 +817,14 @@ sock_accept_nonblock(VALUE sock, VALUE ex)
* * Socket#accept
*/
static VALUE
sock_sysaccept(VALUE sock)
sock_sysaccept(VALUE server)
{
rb_io_t *fptr;
VALUE sock2;
union_sockaddr buf;
socklen_t len = (socklen_t)sizeof buf;
union_sockaddr buffer;
socklen_t length = (socklen_t)sizeof(buffer);
GetOpenFile(sock, fptr);
sock2 = rsock_s_accept(0,fptr->fd,&buf.addr,&len);
VALUE peer = rsock_s_accept(0, server, &buffer.addr, &length);
return rb_assoc_new(sock2, rsock_io_socket_addrinfo(sock2, &buf.addr, len));
return rb_assoc_new(peer, rsock_io_socket_addrinfo(peer, &buffer.addr, length));
}
#ifdef HAVE_GETHOSTNAME

Просмотреть файл

@ -53,15 +53,12 @@ tcp_svr_init(int argc, VALUE *argv, VALUE sock)
*
*/
static VALUE
tcp_accept(VALUE sock)
tcp_accept(VALUE server)
{
rb_io_t *fptr;
union_sockaddr from;
socklen_t fromlen;
union_sockaddr buffer;
socklen_t length = sizeof(buffer);
GetOpenFile(sock, fptr);
fromlen = (socklen_t)sizeof(from);
return rsock_s_accept(rb_cTCPSocket, fptr->fd, &from.addr, &fromlen);
return rsock_s_accept(rb_cTCPSocket, server, &buffer.addr, &length);
}
/* :nodoc: */
@ -91,15 +88,12 @@ tcp_accept_nonblock(VALUE sock, VALUE ex)
*
*/
static VALUE
tcp_sysaccept(VALUE sock)
tcp_sysaccept(VALUE server)
{
rb_io_t *fptr;
union_sockaddr from;
socklen_t fromlen;
union_sockaddr buffer;
socklen_t length = sizeof(buffer);
GetOpenFile(sock, fptr);
fromlen = (socklen_t)sizeof(from);
return rsock_s_accept(0, fptr->fd, &from.addr, &fromlen);
return rsock_s_accept(0, server, &buffer.addr, &length);
}
void

Просмотреть файл

@ -47,16 +47,12 @@ unix_svr_init(VALUE sock, VALUE path)
*
*/
static VALUE
unix_accept(VALUE sock)
unix_accept(VALUE server)
{
rb_io_t *fptr;
struct sockaddr_un from;
socklen_t fromlen;
struct sockaddr_un buffer;
socklen_t length = sizeof(buffer);
GetOpenFile(sock, fptr);
fromlen = (socklen_t)sizeof(struct sockaddr_un);
return rsock_s_accept(rb_cUNIXSocket, fptr->fd,
(struct sockaddr*)&from, &fromlen);
return rsock_s_accept(rb_cUNIXSocket, server, (struct sockaddr*)&buffer, &length);
}
/* :nodoc: */
@ -92,15 +88,12 @@ unix_accept_nonblock(VALUE sock, VALUE ex)
*
*/
static VALUE
unix_sysaccept(VALUE sock)
unix_sysaccept(VALUE server)
{
rb_io_t *fptr;
struct sockaddr_un from;
socklen_t fromlen;
struct sockaddr_un buffer;
socklen_t length = sizeof(buffer);
GetOpenFile(sock, fptr);
fromlen = (socklen_t)sizeof(struct sockaddr_un);
return rsock_s_accept(0, fptr->fd, (struct sockaddr*)&from, &fromlen);
return rsock_s_accept(0, server, (struct sockaddr*)&buffer, &length);
}
#endif

Просмотреть файл

@ -159,6 +159,9 @@ int rb_io_wait_writable(int fd);
int rb_wait_for_single_fd(int fd, int events, struct timeval *tv);
VALUE rb_io_wait(VALUE io, VALUE events, VALUE timeout);
VALUE rb_io_maybe_wait(int error, VALUE io, VALUE events, VALUE timeout);
int rb_io_maybe_wait_readable(int error, VALUE io, VALUE timeout);
int rb_io_maybe_wait_writable(int error, VALUE io, VALUE timeout);
/* compatibility for ruby 1.8 and older */
#define rb_io_mode_flags(modestr) [<"rb_io_mode_flags() is obsolete; use rb_io_modestr_fmode()">]

36
io.c
Просмотреть файл

@ -1392,6 +1392,42 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
return rb_thread_wait_for_single_fd(fd, events, timeout);
}
VALUE rb_io_maybe_wait(int error, VALUE io, VALUE events, VALUE timeout)
{
switch (error) {
case EINTR:
#if defined(ERESTART)
case ERESTART:
#endif
// We might have pending interrupts since the previous syscall was interrupted:
rb_thread_check_ints();
// The operation was interrupted, so retry it immediately:
return events;
case EAGAIN:
#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
#endif
// The operation would block, so wait for the specified events:
return rb_io_wait(io, events, timeout);
default:
// Non-specific error, no event is ready:
return RB_INT2NUM(0);
}
}
int rb_io_maybe_wait_readable(int error, VALUE io, VALUE timeout)
{
return RB_NUM2INT(rb_io_maybe_wait(error, io, RB_INT2NUM(RUBY_IO_READABLE), timeout));
}
int rb_io_maybe_wait_writable(int error, VALUE io, VALUE timeout)
{
return RB_NUM2INT(rb_io_maybe_wait(error, io, RB_INT2NUM(RUBY_IO_WRITABLE), timeout));
}
static void
make_writeconv(rb_io_t *fptr)
{