зеркало из https://github.com/github/ruby.git
Fix busy-loop when waiting for file descriptors to close
When one thread is closing a file descriptor whilst another thread is concurrently reading it, we need to wait for the reading thread to be done with it to prevent a potential EBADF (or, worse, file descriptor reuse). At the moment, that is done by keeping a list of threads still using the file descriptor in io_close_fptr. It then continually calls rb_thread_schedule() in fptr_finalize_flush until said list is empty. That busy-looping seems to behave rather poorly on some OS's, particulary FreeBSD. It can cause the TestIO#test_race_gets_and_close test to fail (even with its very long 200 second timeout) because the closing thread starves out the using thread. To fix that, I introduce the concept of struct rb_io_close_wait_list; a list of threads still using a file descriptor that we want to close. We call `rb_notify_fd_close` to let the thread scheduler know we're closing a FD, which fills the list with threads. Then, we call rb_notify_fd_close_wait which will block the thread until all of the still-using threads are done. This is implemented with a condition variable sleep, so no busy-looping is required.
This commit is contained in:
Родитель
54a74c4203
Коммит
66871c5a06
10
common.mk
10
common.mk
|
@ -6576,6 +6576,10 @@ explicit_bzero.$(OBJEXT): {$(VPATH)}internal/config.h
|
|||
explicit_bzero.$(OBJEXT): {$(VPATH)}internal/dllexport.h
|
||||
explicit_bzero.$(OBJEXT): {$(VPATH)}internal/has/attribute.h
|
||||
explicit_bzero.$(OBJEXT): {$(VPATH)}missing.h
|
||||
file.$(OBJEXT): $(CCAN_DIR)/check_type/check_type.h
|
||||
file.$(OBJEXT): $(CCAN_DIR)/container_of/container_of.h
|
||||
file.$(OBJEXT): $(CCAN_DIR)/list/list.h
|
||||
file.$(OBJEXT): $(CCAN_DIR)/str/str.h
|
||||
file.$(OBJEXT): $(hdrdir)/ruby/ruby.h
|
||||
file.$(OBJEXT): $(top_srcdir)/internal/array.h
|
||||
file.$(OBJEXT): $(top_srcdir)/internal/class.h
|
||||
|
@ -6773,6 +6777,7 @@ file.$(OBJEXT): {$(VPATH)}shape.h
|
|||
file.$(OBJEXT): {$(VPATH)}st.h
|
||||
file.$(OBJEXT): {$(VPATH)}subst.h
|
||||
file.$(OBJEXT): {$(VPATH)}thread.h
|
||||
file.$(OBJEXT): {$(VPATH)}thread_native.h
|
||||
file.$(OBJEXT): {$(VPATH)}util.h
|
||||
gc.$(OBJEXT): $(CCAN_DIR)/check_type/check_type.h
|
||||
gc.$(OBJEXT): $(CCAN_DIR)/container_of/container_of.h
|
||||
|
@ -7810,6 +7815,10 @@ io.$(OBJEXT): {$(VPATH)}thread_native.h
|
|||
io.$(OBJEXT): {$(VPATH)}util.h
|
||||
io.$(OBJEXT): {$(VPATH)}vm_core.h
|
||||
io.$(OBJEXT): {$(VPATH)}vm_opts.h
|
||||
io_buffer.$(OBJEXT): $(CCAN_DIR)/check_type/check_type.h
|
||||
io_buffer.$(OBJEXT): $(CCAN_DIR)/container_of/container_of.h
|
||||
io_buffer.$(OBJEXT): $(CCAN_DIR)/list/list.h
|
||||
io_buffer.$(OBJEXT): $(CCAN_DIR)/str/str.h
|
||||
io_buffer.$(OBJEXT): $(hdrdir)/ruby/ruby.h
|
||||
io_buffer.$(OBJEXT): $(top_srcdir)/internal/array.h
|
||||
io_buffer.$(OBJEXT): $(top_srcdir)/internal/bignum.h
|
||||
|
@ -7995,6 +8004,7 @@ io_buffer.$(OBJEXT): {$(VPATH)}onigmo.h
|
|||
io_buffer.$(OBJEXT): {$(VPATH)}oniguruma.h
|
||||
io_buffer.$(OBJEXT): {$(VPATH)}st.h
|
||||
io_buffer.$(OBJEXT): {$(VPATH)}subst.h
|
||||
io_buffer.$(OBJEXT): {$(VPATH)}thread_native.h
|
||||
iseq.$(OBJEXT): $(CCAN_DIR)/check_type/check_type.h
|
||||
iseq.$(OBJEXT): $(CCAN_DIR)/container_of/container_of.h
|
||||
iseq.$(OBJEXT): $(CCAN_DIR)/list/list.h
|
||||
|
|
|
@ -10,6 +10,8 @@
|
|||
*/
|
||||
#include "ruby/ruby.h" /* for VALUE */
|
||||
#include "ruby/intern.h" /* for rb_blocking_function_t */
|
||||
#include "ccan/list/list.h" /* for list in rb_io_close_wait_list */
|
||||
#include "ruby/thread_native.h" /* for mutexes in rb_io_close_wait_list */
|
||||
|
||||
struct rb_thread_struct; /* in vm_core.h */
|
||||
|
||||
|
@ -52,6 +54,14 @@ VALUE rb_exec_recursive_outer_mid(VALUE (*f)(VALUE g, VALUE h, int r), VALUE g,
|
|||
|
||||
int rb_thread_wait_for_single_fd(int fd, int events, struct timeval * timeout);
|
||||
|
||||
struct rb_io_close_wait_list {
|
||||
struct ccan_list_head list;
|
||||
rb_nativethread_lock_t mu;
|
||||
rb_nativethread_cond_t cv;
|
||||
};
|
||||
int rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy);
|
||||
void rb_notify_fd_close_wait(struct rb_io_close_wait_list *busy);
|
||||
|
||||
RUBY_SYMBOL_EXPORT_BEGIN
|
||||
/* Temporary. This API will be removed (renamed). */
|
||||
VALUE rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd);
|
||||
|
|
16
io.c
16
io.c
|
@ -5422,9 +5422,17 @@ maygvl_fclose(FILE *file, int keepgvl)
|
|||
static void free_io_buffer(rb_io_buffer_t *buf);
|
||||
static void clear_codeconv(rb_io_t *fptr);
|
||||
|
||||
static void*
|
||||
call_close_wait_nogvl(void *arg)
|
||||
{
|
||||
struct rb_io_close_wait_list *busy = (struct rb_io_close_wait_list *)arg;
|
||||
rb_notify_fd_close_wait(busy);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void
|
||||
fptr_finalize_flush(rb_io_t *fptr, int noraise, int keepgvl,
|
||||
struct ccan_list_head *busy)
|
||||
struct rb_io_close_wait_list *busy)
|
||||
{
|
||||
VALUE error = Qnil;
|
||||
int fd = fptr->fd;
|
||||
|
@ -5467,7 +5475,7 @@ fptr_finalize_flush(rb_io_t *fptr, int noraise, int keepgvl,
|
|||
// Ensure waiting_fd users do not hit EBADF.
|
||||
if (busy) {
|
||||
// Wait for them to exit before we call close().
|
||||
do rb_thread_schedule(); while (!ccan_list_empty(busy));
|
||||
(void)rb_thread_call_without_gvl(call_close_wait_nogvl, busy, RUBY_UBF_IO, 0);
|
||||
}
|
||||
|
||||
// Disable for now.
|
||||
|
@ -5618,16 +5626,14 @@ rb_io_memsize(const rb_io_t *fptr)
|
|||
# define KEEPGVL FALSE
|
||||
#endif
|
||||
|
||||
int rb_notify_fd_close(int fd, struct ccan_list_head *);
|
||||
static rb_io_t *
|
||||
io_close_fptr(VALUE io)
|
||||
{
|
||||
rb_io_t *fptr;
|
||||
VALUE write_io;
|
||||
rb_io_t *write_fptr;
|
||||
struct ccan_list_head busy;
|
||||
struct rb_io_close_wait_list busy;
|
||||
|
||||
ccan_list_head_init(&busy);
|
||||
write_io = GetWriteIO(io);
|
||||
if (io != write_io) {
|
||||
write_fptr = RFILE(write_io)->fptr;
|
||||
|
|
50
thread.c
50
thread.c
|
@ -155,6 +155,7 @@ struct waiting_fd {
|
|||
struct ccan_list_node wfd_node; /* <=> vm.waiting_fds */
|
||||
rb_thread_t *th;
|
||||
int fd;
|
||||
struct rb_io_close_wait_list *busy;
|
||||
};
|
||||
|
||||
/********************************************************************************/
|
||||
|
@ -1672,7 +1673,8 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
|
|||
|
||||
struct waiting_fd waiting_fd = {
|
||||
.fd = fd,
|
||||
.th = rb_ec_thread_ptr(ec)
|
||||
.th = rb_ec_thread_ptr(ec),
|
||||
.busy = NULL,
|
||||
};
|
||||
|
||||
// `errno` is only valid when there is an actual error - but we can't
|
||||
|
@ -1702,7 +1704,14 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
|
|||
*/
|
||||
RB_VM_LOCK_ENTER();
|
||||
{
|
||||
if (waiting_fd.busy) {
|
||||
rb_native_mutex_lock(&waiting_fd.busy->mu);
|
||||
}
|
||||
ccan_list_del(&waiting_fd.wfd_node);
|
||||
if (waiting_fd.busy) {
|
||||
rb_native_cond_broadcast(&waiting_fd.busy->cv);
|
||||
rb_native_mutex_unlock(&waiting_fd.busy->mu);
|
||||
}
|
||||
}
|
||||
RB_VM_LOCK_LEAVE();
|
||||
|
||||
|
@ -2461,10 +2470,12 @@ rb_ec_reset_raised(rb_execution_context_t *ec)
|
|||
}
|
||||
|
||||
int
|
||||
rb_notify_fd_close(int fd, struct ccan_list_head *busy)
|
||||
rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy)
|
||||
{
|
||||
rb_vm_t *vm = GET_THREAD()->vm;
|
||||
struct waiting_fd *wfd = 0, *next;
|
||||
ccan_list_head_init(&busy->list);
|
||||
int has_any;
|
||||
|
||||
RB_VM_LOCK_ENTER();
|
||||
{
|
||||
|
@ -2474,27 +2485,52 @@ rb_notify_fd_close(int fd, struct ccan_list_head *busy)
|
|||
VALUE err;
|
||||
|
||||
ccan_list_del(&wfd->wfd_node);
|
||||
ccan_list_add(busy, &wfd->wfd_node);
|
||||
ccan_list_add(&busy->list, &wfd->wfd_node);
|
||||
|
||||
wfd->busy = busy;
|
||||
err = th->vm->special_exceptions[ruby_error_stream_closed];
|
||||
rb_threadptr_pending_interrupt_enque(th, err);
|
||||
rb_threadptr_interrupt(th);
|
||||
}
|
||||
}
|
||||
}
|
||||
has_any = !ccan_list_empty(&busy->list);
|
||||
if (has_any) {
|
||||
rb_native_mutex_initialize(&busy->mu);
|
||||
rb_native_cond_initialize(&busy->cv);
|
||||
}
|
||||
RB_VM_LOCK_LEAVE();
|
||||
|
||||
return !ccan_list_empty(busy);
|
||||
return has_any;
|
||||
}
|
||||
|
||||
void
|
||||
rb_notify_fd_close_wait(struct rb_io_close_wait_list *busy)
|
||||
{
|
||||
rb_native_mutex_lock(&busy->mu);
|
||||
while (!ccan_list_empty(&busy->list)) {
|
||||
rb_native_cond_wait(&busy->cv, &busy->mu);
|
||||
};
|
||||
rb_native_mutex_unlock(&busy->mu);
|
||||
rb_native_mutex_destroy(&busy->mu);
|
||||
rb_native_cond_destroy(&busy->cv);
|
||||
}
|
||||
|
||||
static void*
|
||||
call_notify_fd_close_wait_nogvl(void *arg)
|
||||
{
|
||||
struct rb_io_close_wait_list *busy = (struct rb_io_close_wait_list *)arg;
|
||||
rb_notify_fd_close_wait(busy);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void
|
||||
rb_thread_fd_close(int fd)
|
||||
{
|
||||
struct ccan_list_head busy;
|
||||
struct rb_io_close_wait_list busy;
|
||||
|
||||
ccan_list_head_init(&busy);
|
||||
if (rb_notify_fd_close(fd, &busy)) {
|
||||
do rb_thread_schedule(); while (!ccan_list_empty(&busy));
|
||||
rb_thread_call_without_gvl(call_notify_fd_close_wait_nogvl, &busy, RUBY_UBF_IO, 0);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче