зеркало из https://github.com/github/ruby.git
process.c: simplify SIGCHLD-based waitpid
Introduce a new rb_thread_sleep_interruptible that does not execute interrupts before sleeping. Skipping the interrupt check before sleep is required for out-of-GVL ruby_waitpid_all to function properly when setting waitpid_state.ret Now that ubf_select can be called by the gvl.timer thread without recursive locking gvl.lock, we can safely use rb_threadptr_interrupt to deal with waking up sleeping processes, git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@64576 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
Родитель
943bf37b10
Коммит
9d4027b503
133
process.c
133
process.c
|
@ -924,18 +924,16 @@ struct waitpid_state {
|
|||
int status;
|
||||
int options;
|
||||
int errnum;
|
||||
int sigwait_fd;
|
||||
};
|
||||
|
||||
void rb_native_mutex_lock(rb_nativethread_lock_t *);
|
||||
void rb_native_mutex_unlock(rb_nativethread_lock_t *);
|
||||
void rb_native_cond_signal(rb_nativethread_cond_t *);
|
||||
void rb_native_cond_wait(rb_nativethread_cond_t *, rb_nativethread_lock_t *);
|
||||
rb_nativethread_cond_t *rb_sleep_cond_get(const rb_execution_context_t *);
|
||||
void rb_sleep_cond_put(rb_nativethread_cond_t *);
|
||||
int rb_sigwait_fd_get(const rb_thread_t *);
|
||||
void rb_sigwait_sleep(const rb_thread_t *, int fd, const struct timespec *);
|
||||
void rb_sigwait_fd_put(const rb_thread_t *, int fd);
|
||||
void rb_thread_sleep_interruptible(void);
|
||||
|
||||
static int
|
||||
sigwait_fd_migrate_signaled_p(struct waitpid_state *w)
|
||||
|
@ -982,19 +980,6 @@ rb_sigwait_fd_migrate(rb_vm_t *vm)
|
|||
}
|
||||
|
||||
#if RUBY_SIGCHLD
|
||||
static void
|
||||
waitpid_notify(struct waitpid_state *w, rb_pid_t ret)
|
||||
{
|
||||
w->ret = ret;
|
||||
list_del_init(&w->wnode);
|
||||
if (w->cond) {
|
||||
rb_native_cond_signal(w->cond);
|
||||
}
|
||||
else {
|
||||
/* w is owned by this thread */
|
||||
}
|
||||
}
|
||||
|
||||
extern volatile unsigned int ruby_nocldwait; /* signal.c */
|
||||
/* called by timer thread or thread which acquired sigwait_fd */
|
||||
static void
|
||||
|
@ -1008,15 +993,15 @@ waitpid_each(struct list_head *head)
|
|||
if (!ret) continue;
|
||||
if (ret == -1) w->errnum = errno;
|
||||
|
||||
w->ret = ret;
|
||||
list_del_init(&w->wnode);
|
||||
if (w->ec) { /* rb_waitpid */
|
||||
rb_thread_t *th = rb_ec_thread_ptr(w->ec);
|
||||
|
||||
rb_native_mutex_lock(&th->interrupt_lock);
|
||||
waitpid_notify(w, ret);
|
||||
rb_native_mutex_unlock(&th->interrupt_lock);
|
||||
rb_threadptr_interrupt(rb_ec_thread_ptr(w->ec));
|
||||
}
|
||||
else { /* ruby_waitpid_locked */
|
||||
waitpid_notify(w, ret);
|
||||
if (w->cond) {
|
||||
rb_native_cond_signal(w->cond);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1079,17 +1064,18 @@ ruby_waitpid_locked(rb_vm_t *vm, rb_pid_t pid, int *status, int options,
|
|||
if (w.ret == -1) w.errnum = errno;
|
||||
}
|
||||
else {
|
||||
int sigwait_fd = -1;
|
||||
|
||||
w.ec = 0;
|
||||
w.sigwait_fd = -1;
|
||||
list_add(w.pid > 0 ? &vm->waiting_pids : &vm->waiting_grps, &w.wnode);
|
||||
do {
|
||||
if (w.sigwait_fd < 0)
|
||||
w.sigwait_fd = rb_sigwait_fd_get(0);
|
||||
if (sigwait_fd < 0)
|
||||
sigwait_fd = rb_sigwait_fd_get(0);
|
||||
|
||||
if (w.sigwait_fd >= 0) {
|
||||
if (sigwait_fd >= 0) {
|
||||
w.cond = 0;
|
||||
rb_native_mutex_unlock(&vm->waitpid_lock);
|
||||
rb_sigwait_sleep(0, w.sigwait_fd, sigwait_sleep_time());
|
||||
rb_sigwait_sleep(0, sigwait_fd, sigwait_sleep_time());
|
||||
rb_native_mutex_lock(&vm->waitpid_lock);
|
||||
}
|
||||
else {
|
||||
|
@ -1100,8 +1086,8 @@ ruby_waitpid_locked(rb_vm_t *vm, rb_pid_t pid, int *status, int options,
|
|||
list_del(&w.wnode);
|
||||
|
||||
/* we're done, maybe other waitpid callers are not: */
|
||||
if (w.sigwait_fd >= 0) {
|
||||
rb_sigwait_fd_put(0, w.sigwait_fd);
|
||||
if (sigwait_fd >= 0) {
|
||||
rb_sigwait_fd_put(0, sigwait_fd);
|
||||
sigwait_fd_migrate_sleeper(vm);
|
||||
}
|
||||
}
|
||||
|
@ -1112,80 +1098,13 @@ ruby_waitpid_locked(rb_vm_t *vm, rb_pid_t pid, int *status, int options,
|
|||
return w.ret;
|
||||
}
|
||||
|
||||
static void
|
||||
waitpid_wake(void *x)
|
||||
{
|
||||
struct waitpid_state *w = x;
|
||||
|
||||
/* th->interrupt_lock is already held by rb_threadptr_interrupt_common */
|
||||
if (w->cond)
|
||||
rb_native_cond_signal(w->cond);
|
||||
else
|
||||
rb_thread_wakeup_timer_thread(0); /* kick sigwait_fd */
|
||||
}
|
||||
|
||||
static void *
|
||||
waitpid_nogvl(void *x)
|
||||
{
|
||||
struct waitpid_state *w = x;
|
||||
rb_thread_t *th = rb_ec_thread_ptr(w->ec);
|
||||
|
||||
rb_native_mutex_lock(&th->interrupt_lock);
|
||||
/*
|
||||
* We must check again before waiting, timer-thread may change w->ret
|
||||
* by the time we enter this. And we may also be interrupted.
|
||||
*/
|
||||
if (!w->ret && !RUBY_VM_INTERRUPTED_ANY(w->ec)) {
|
||||
if (w->sigwait_fd < 0)
|
||||
w->sigwait_fd = rb_sigwait_fd_get(th);
|
||||
|
||||
if (w->sigwait_fd >= 0) {
|
||||
rb_nativethread_cond_t *cond = w->cond;
|
||||
|
||||
w->cond = 0;
|
||||
rb_native_mutex_unlock(&th->interrupt_lock);
|
||||
rb_sigwait_sleep(th, w->sigwait_fd, sigwait_sleep_time());
|
||||
rb_native_mutex_lock(&th->interrupt_lock);
|
||||
w->cond = cond;
|
||||
}
|
||||
else {
|
||||
if (!w->cond)
|
||||
w->cond = rb_sleep_cond_get(w->ec);
|
||||
|
||||
/* another thread calling rb_sigwait_sleep will process
|
||||
* signals for us */
|
||||
if (SIGCHLD_LOSSY) {
|
||||
/*
|
||||
* XXX this may not be needed anymore with new GVL
|
||||
* and sigwait_fd usage
|
||||
*/
|
||||
rb_thread_wakeup_timer_thread(0);
|
||||
}
|
||||
rb_native_cond_wait(w->cond, &th->interrupt_lock);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* we must release th->native_thread_data.sleep_cond when
|
||||
* re-acquiring GVL:
|
||||
*/
|
||||
if (w->cond) {
|
||||
rb_sleep_cond_put(w->cond);
|
||||
w->cond = 0;
|
||||
}
|
||||
|
||||
rb_native_mutex_unlock(&th->interrupt_lock);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static VALUE
|
||||
waitpid_sleep(VALUE x)
|
||||
{
|
||||
struct waitpid_state *w = (struct waitpid_state *)x;
|
||||
|
||||
while (!w->ret) {
|
||||
rb_thread_call_without_gvl(waitpid_nogvl, w, waitpid_wake, w);
|
||||
rb_thread_sleep_interruptible();
|
||||
}
|
||||
|
||||
return Qfalse;
|
||||
|
@ -1204,14 +1123,6 @@ waitpid_cleanup(VALUE x)
|
|||
rb_native_mutex_unlock(&vm->waitpid_lock);
|
||||
}
|
||||
|
||||
/* we may have never released and re-acquired GVL */
|
||||
if (w->cond)
|
||||
rb_sleep_cond_put(w->cond);
|
||||
|
||||
if (w->sigwait_fd >= 0) {
|
||||
rb_sigwait_fd_put(rb_ec_thread_ptr(w->ec), w->sigwait_fd);
|
||||
rb_sigwait_fd_migrate(rb_ec_vm_ptr(w->ec));
|
||||
}
|
||||
return Qfalse;
|
||||
}
|
||||
|
||||
|
@ -1219,6 +1130,7 @@ static void
|
|||
waitpid_wait(struct waitpid_state *w)
|
||||
{
|
||||
rb_vm_t *vm = rb_ec_vm_ptr(w->ec);
|
||||
int need_sleep = FALSE;
|
||||
|
||||
/*
|
||||
* Lock here to prevent do_waitpid from stealing work from the
|
||||
|
@ -1230,22 +1142,23 @@ waitpid_wait(struct waitpid_state *w)
|
|||
if (w->pid > 0 || list_empty(&vm->waiting_pids))
|
||||
w->ret = do_waitpid(w->pid, &w->status, w->options | WNOHANG);
|
||||
if (w->ret) {
|
||||
w->cond = 0;
|
||||
if (w->ret == -1) w->errnum = errno;
|
||||
}
|
||||
else if (w->options & WNOHANG) {
|
||||
w->cond = 0;
|
||||
}
|
||||
else {
|
||||
w->cond = rb_sleep_cond_get(w->ec);
|
||||
w->sigwait_fd = -1;
|
||||
need_sleep = TRUE;
|
||||
}
|
||||
|
||||
if (need_sleep) {
|
||||
w->cond = 0;
|
||||
/* order matters, favor specified PIDs rather than -1 or 0 */
|
||||
list_add(w->pid > 0 ? &vm->waiting_pids : &vm->waiting_grps, &w->wnode);
|
||||
}
|
||||
|
||||
rb_native_mutex_unlock(&vm->waitpid_lock);
|
||||
|
||||
if (w->cond) {
|
||||
if (need_sleep) {
|
||||
rb_ensure(waitpid_sleep, (VALUE)w, waitpid_cleanup, (VALUE)w);
|
||||
}
|
||||
}
|
||||
|
|
34
thread.c
34
thread.c
|
@ -1256,6 +1256,18 @@ rb_thread_sleep_deadly(void)
|
|||
sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE|SLEEP_SPURIOUS_CHECK);
|
||||
}
|
||||
|
||||
void
|
||||
rb_thread_sleep_interruptible(void)
|
||||
{
|
||||
rb_thread_t *th = GET_THREAD();
|
||||
enum rb_thread_status prev_status = th->status;
|
||||
|
||||
th->status = THREAD_STOPPED;
|
||||
native_sleep(th, 0);
|
||||
RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
|
||||
th->status = prev_status;
|
||||
}
|
||||
|
||||
static void
|
||||
rb_thread_sleep_deadly_allow_spurious_wakeup(void)
|
||||
{
|
||||
|
@ -5408,25 +5420,3 @@ rb_uninterruptible(VALUE (*b_proc)(ANYARGS), VALUE data)
|
|||
|
||||
return rb_ensure(b_proc, data, rb_ary_pop, cur_th->pending_interrupt_mask_stack);
|
||||
}
|
||||
|
||||
#ifndef USE_NATIVE_SLEEP_COND
|
||||
# define USE_NATIVE_SLEEP_COND (0)
|
||||
#endif
|
||||
|
||||
#if !USE_NATIVE_SLEEP_COND
|
||||
rb_nativethread_cond_t *
|
||||
rb_sleep_cond_get(const rb_execution_context_t *ec)
|
||||
{
|
||||
rb_nativethread_cond_t *cond = ALLOC(rb_nativethread_cond_t);
|
||||
rb_native_cond_initialize(cond);
|
||||
|
||||
return cond;
|
||||
}
|
||||
|
||||
void
|
||||
rb_sleep_cond_put(rb_nativethread_cond_t *cond)
|
||||
{
|
||||
rb_native_cond_destroy(cond);
|
||||
xfree(cond);
|
||||
}
|
||||
#endif /* !USE_NATIVE_SLEEP_COND */
|
||||
|
|
|
@ -1873,26 +1873,6 @@ rb_thread_create_mjit_thread(void (*worker_func)(void))
|
|||
return ret;
|
||||
}
|
||||
|
||||
#ifndef USE_NATIVE_SLEEP_COND
|
||||
#define USE_NATIVE_SLEEP_COND (1)
|
||||
#endif
|
||||
|
||||
#if USE_NATIVE_SLEEP_COND
|
||||
rb_nativethread_cond_t *
|
||||
rb_sleep_cond_get(const rb_execution_context_t *ec)
|
||||
{
|
||||
rb_thread_t *th = rb_ec_thread_ptr(ec);
|
||||
|
||||
return &th->native_thread_data.cond.intr;
|
||||
}
|
||||
|
||||
void
|
||||
rb_sleep_cond_put(rb_nativethread_cond_t *cond)
|
||||
{
|
||||
/* no-op */
|
||||
}
|
||||
#endif /* USE_NATIVE_SLEEP_COND */
|
||||
|
||||
int
|
||||
rb_sigwait_fd_get(const rb_thread_t *th)
|
||||
{
|
||||
|
|
Загрузка…
Ссылка в новой задаче