/* -*-c-*- */ /********************************************************************** thread_pthread.c - $Author$ Copyright (C) 2004-2007 Koichi Sasada **********************************************************************/ #ifdef THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION #include "internal/gc.h" #include "rjit.h" #ifdef HAVE_SYS_RESOURCE_H #include #endif #ifdef HAVE_THR_STKSEGMENT #include #endif #if defined(HAVE_FCNTL_H) #include #elif defined(HAVE_SYS_FCNTL_H) #include #endif #ifdef HAVE_SYS_PRCTL_H #include #endif #if defined(HAVE_SYS_TIME_H) #include #endif #if defined(__HAIKU__) #include #endif #ifdef __linux__ #include /* for SYS_gettid */ #endif #include #include #if defined __APPLE__ # include #endif #if defined(HAVE_SYS_EVENTFD_H) && defined(HAVE_EVENTFD) # define USE_EVENTFD (1) # include #else # define USE_EVENTFD (0) #endif #if defined(HAVE_PTHREAD_CONDATTR_SETCLOCK) && \ defined(CLOCK_REALTIME) && defined(CLOCK_MONOTONIC) && \ defined(HAVE_CLOCK_GETTIME) static pthread_condattr_t condattr_mono; static pthread_condattr_t *condattr_monotonic = &condattr_mono; #else static const void *const condattr_monotonic = NULL; #endif #include COROUTINE_H #ifndef HAVE_SYS_EVENT_H #define HAVE_SYS_EVENT_H 0 #endif #ifndef HAVE_SYS_EPOLL_H #define HAVE_SYS_EPOLL_H 0 #else // force setting for debug // #undef HAVE_SYS_EPOLL_H // #define HAVE_SYS_EPOLL_H 0 #endif #ifndef USE_MN_THREADS #if defined(__EMSCRIPTEN__) || defined(COROUTINE_PTHREAD_CONTEXT) // on __EMSCRIPTEN__ provides epoll* declarations, but no implementations. // on COROUTINE_PTHREAD_CONTEXT, it doesn't worth to use it. #define USE_MN_THREADS 0 #elif HAVE_SYS_EPOLL_H #include #define USE_MN_THREADS 1 #elif HAVE_SYS_EVENT_H #include #define USE_MN_THREADS 1 #else #define USE_MN_THREADS 0 #endif #endif // native thread wrappers #define NATIVE_MUTEX_LOCK_DEBUG 0 static void mutex_debug(const char *msg, void *lock) { if (NATIVE_MUTEX_LOCK_DEBUG) { int r; static pthread_mutex_t dbglock = PTHREAD_MUTEX_INITIALIZER; if ((r = pthread_mutex_lock(&dbglock)) != 0) {exit(EXIT_FAILURE);} fprintf(stdout, "%s: %p\n", msg, lock); if ((r = pthread_mutex_unlock(&dbglock)) != 0) {exit(EXIT_FAILURE);} } } void rb_native_mutex_lock(pthread_mutex_t *lock) { int r; mutex_debug("lock", lock); if ((r = pthread_mutex_lock(lock)) != 0) { rb_bug_errno("pthread_mutex_lock", r); } } void rb_native_mutex_unlock(pthread_mutex_t *lock) { int r; mutex_debug("unlock", lock); if ((r = pthread_mutex_unlock(lock)) != 0) { rb_bug_errno("pthread_mutex_unlock", r); } } int rb_native_mutex_trylock(pthread_mutex_t *lock) { int r; mutex_debug("trylock", lock); if ((r = pthread_mutex_trylock(lock)) != 0) { if (r == EBUSY) { return EBUSY; } else { rb_bug_errno("pthread_mutex_trylock", r); } } return 0; } void rb_native_mutex_initialize(pthread_mutex_t *lock) { int r = pthread_mutex_init(lock, 0); mutex_debug("init", lock); if (r != 0) { rb_bug_errno("pthread_mutex_init", r); } } void rb_native_mutex_destroy(pthread_mutex_t *lock) { int r = pthread_mutex_destroy(lock); mutex_debug("destroy", lock); if (r != 0) { rb_bug_errno("pthread_mutex_destroy", r); } } void rb_native_cond_initialize(rb_nativethread_cond_t *cond) { int r = pthread_cond_init(cond, condattr_monotonic); if (r != 0) { rb_bug_errno("pthread_cond_init", r); } } void rb_native_cond_destroy(rb_nativethread_cond_t *cond) { int r = pthread_cond_destroy(cond); if (r != 0) { rb_bug_errno("pthread_cond_destroy", r); } } /* * In OS X 10.7 (Lion), pthread_cond_signal and pthread_cond_broadcast return * EAGAIN after retrying 8192 times. You can see them in the following page: * * http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c * * The following rb_native_cond_signal and rb_native_cond_broadcast functions * need to retrying until pthread functions don't return EAGAIN. */ void rb_native_cond_signal(rb_nativethread_cond_t *cond) { int r; do { r = pthread_cond_signal(cond); } while (r == EAGAIN); if (r != 0) { rb_bug_errno("pthread_cond_signal", r); } } void rb_native_cond_broadcast(rb_nativethread_cond_t *cond) { int r; do { r = pthread_cond_broadcast(cond); } while (r == EAGAIN); if (r != 0) { rb_bug_errno("rb_native_cond_broadcast", r); } } void rb_native_cond_wait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex) { int r = pthread_cond_wait(cond, mutex); if (r != 0) { rb_bug_errno("pthread_cond_wait", r); } } static int native_cond_timedwait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex, const rb_hrtime_t *abs) { int r; struct timespec ts; /* * An old Linux may return EINTR. Even though POSIX says * "These functions shall not return an error code of [EINTR]". * http://pubs.opengroup.org/onlinepubs/009695399/functions/pthread_cond_timedwait.html * Let's hide it from arch generic code. */ do { rb_hrtime2timespec(&ts, abs); r = pthread_cond_timedwait(cond, mutex, &ts); } while (r == EINTR); if (r != 0 && r != ETIMEDOUT) { rb_bug_errno("pthread_cond_timedwait", r); } return r; } static rb_hrtime_t native_cond_timeout(rb_nativethread_cond_t *cond, const rb_hrtime_t rel) { if (condattr_monotonic) { return rb_hrtime_add(rb_hrtime_now(), rel); } else { struct timespec ts; rb_timespec_now(&ts); return rb_hrtime_add(rb_timespec2hrtime(&ts), rel); } } void rb_native_cond_timedwait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex, unsigned long msec) { rb_hrtime_t hrmsec = native_cond_timeout(cond, RB_HRTIME_PER_MSEC * msec); native_cond_timedwait(cond, mutex, &hrmsec); } // thread scheduling static rb_internal_thread_event_hook_t *rb_internal_thread_event_hooks = NULL; static void rb_thread_execute_hooks(rb_event_flag_t event, rb_thread_t *th); #if 0 static const char * event_name(rb_event_flag_t event) { switch (event) { case RUBY_INTERNAL_THREAD_EVENT_STARTED: return "STARTED"; case RUBY_INTERNAL_THREAD_EVENT_READY: return "READY"; case RUBY_INTERNAL_THREAD_EVENT_RESUMED: return "RESUMED"; case RUBY_INTERNAL_THREAD_EVENT_SUSPENDED: return "SUSPENDED"; case RUBY_INTERNAL_THREAD_EVENT_EXITED: return "EXITED"; } return "no-event"; } #define RB_INTERNAL_THREAD_HOOK(event, th) \ if (UNLIKELY(rb_internal_thread_event_hooks)) { \ fprintf(stderr, "[thread=%"PRIxVALUE"] %s in %s (%s:%d)\n", th->self, event_name(event), __func__, __FILE__, __LINE__); \ rb_thread_execute_hooks(event, th); \ } #else #define RB_INTERNAL_THREAD_HOOK(event, th) if (UNLIKELY(rb_internal_thread_event_hooks)) { rb_thread_execute_hooks(event, th); } #endif static rb_serial_t current_fork_gen = 1; /* We can't use GET_VM()->fork_gen */ #if defined(SIGVTALRM) && !defined(__CYGWIN__) && !defined(__EMSCRIPTEN__) # define USE_UBF_LIST 1 #endif static void threadptr_trap_interrupt(rb_thread_t *); #ifdef HAVE_SCHED_YIELD #define native_thread_yield() (void)sched_yield() #else #define native_thread_yield() ((void)0) #endif /* 100ms. 10ms is too small for user level thread scheduling * on recent Linux (tested on 2.6.35) */ #define TIME_QUANTUM_MSEC (100) #define TIME_QUANTUM_USEC (TIME_QUANTUM_MSEC * 1000) #define TIME_QUANTUM_NSEC (TIME_QUANTUM_USEC * 1000) static void native_thread_dedicated_inc(rb_vm_t *vm, rb_ractor_t *cr, struct rb_native_thread *nt); static void native_thread_dedicated_dec(rb_vm_t *vm, rb_ractor_t *cr, struct rb_native_thread *nt); static void native_thread_assign(struct rb_native_thread *nt, rb_thread_t *th); static void ractor_sched_enq(rb_vm_t *vm, rb_ractor_t *r); static void timer_thread_wakeup(void); static void timer_thread_wakeup_locked(rb_vm_t *vm); static void timer_thread_wakeup_force(void); static void thread_sched_switch(rb_thread_t *cth, rb_thread_t *next_th); #define thread_sched_dump(s) thread_sched_dump_(__FILE__, __LINE__, s) static bool th_has_dedicated_nt(const rb_thread_t *th) { // TODO: th->has_dedicated_nt return th->nt->dedicated > 0; } RBIMPL_ATTR_MAYBE_UNUSED() static void thread_sched_dump_(const char *file, int line, struct rb_thread_sched *sched) { fprintf(stderr, "@%s:%d running:%d\n", file, line, sched->running ? (int)sched->running->serial : -1); rb_thread_t *th; int i = 0; ccan_list_for_each(&sched->readyq, th, sched.node.readyq) { i++; if (i>10) rb_bug("too many"); fprintf(stderr, " ready:%d (%sNT:%d)\n", th->serial, th->nt ? (th->nt->dedicated ? "D" : "S") : "x", th->nt ? (int)th->nt->serial : -1); } } #define ractor_sched_dump(s) ractor_sched_dump_(__FILE__, __LINE__, s) RBIMPL_ATTR_MAYBE_UNUSED() static void ractor_sched_dump_(const char *file, int line, rb_vm_t *vm) { rb_ractor_t *r; fprintf(stderr, "ractor_sched_dump %s:%d\n", file, line); int i = 0; ccan_list_for_each(&vm->ractor.sched.grq, r, threads.sched.grq_node) { i++; if (i>10) rb_bug("!!"); fprintf(stderr, " %d ready:%d\n", i, rb_ractor_id(r)); } } #define thread_sched_lock(a, b) thread_sched_lock_(a, b, __FILE__, __LINE__) #define thread_sched_unlock(a, b) thread_sched_unlock_(a, b, __FILE__, __LINE__) static void thread_sched_lock_(struct rb_thread_sched *sched, rb_thread_t *th, const char *file, int line) { rb_native_mutex_lock(&sched->lock_); #if VM_CHECK_MODE RUBY_DEBUG_LOG2(file, line, "th:%u prev_owner:%u", rb_th_serial(th), rb_th_serial(sched->lock_owner)); VM_ASSERT(sched->lock_owner == NULL); sched->lock_owner = th; #else RUBY_DEBUG_LOG2(file, line, "th:%u", rb_th_serial(th)); #endif } static void thread_sched_unlock_(struct rb_thread_sched *sched, rb_thread_t *th, const char *file, int line) { RUBY_DEBUG_LOG2(file, line, "th:%u", rb_th_serial(th)); #if VM_CHECK_MODE VM_ASSERT(sched->lock_owner == th); sched->lock_owner = NULL; #endif rb_native_mutex_unlock(&sched->lock_); } static void thread_sched_set_lock_owner(struct rb_thread_sched *sched, rb_thread_t *th) { RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); #if VM_CHECK_MODE > 0 sched->lock_owner = th; #endif } static void ASSERT_thread_sched_locked(struct rb_thread_sched *sched, rb_thread_t *th) { VM_ASSERT(rb_native_mutex_trylock(&sched->lock_) == EBUSY); #if VM_CHECK_MODE if (th) { VM_ASSERT(sched->lock_owner == th); } else { VM_ASSERT(sched->lock_owner != NULL); } #endif } #define ractor_sched_lock(a, b) ractor_sched_lock_(a, b, __FILE__, __LINE__) #define ractor_sched_unlock(a, b) ractor_sched_unlock_(a, b, __FILE__, __LINE__) RBIMPL_ATTR_MAYBE_UNUSED() static unsigned int rb_ractor_serial(const rb_ractor_t *r) { if (r) { return rb_ractor_id(r); } else { return 0; } } static void ractor_sched_set_locked(rb_vm_t *vm, rb_ractor_t *cr) { #if VM_CHECK_MODE > 0 VM_ASSERT(vm->ractor.sched.lock_owner == NULL); VM_ASSERT(vm->ractor.sched.locked == false); vm->ractor.sched.lock_owner = cr; vm->ractor.sched.locked = true; #endif } static void ractor_sched_set_unlocked(rb_vm_t *vm, rb_ractor_t *cr) { #if VM_CHECK_MODE > 0 VM_ASSERT(vm->ractor.sched.locked); VM_ASSERT(vm->ractor.sched.lock_owner == cr); vm->ractor.sched.locked = false; vm->ractor.sched.lock_owner = NULL; #endif } static void ractor_sched_lock_(rb_vm_t *vm, rb_ractor_t *cr, const char *file, int line) { rb_native_mutex_lock(&vm->ractor.sched.lock); #if VM_CHECK_MODE RUBY_DEBUG_LOG2(file, line, "cr:%u prev_owner:%u", rb_ractor_serial(cr), rb_ractor_serial(vm->ractor.sched.lock_owner)); #else RUBY_DEBUG_LOG2(file, line, "cr:%u", rb_ractor_serial(cr)); #endif ractor_sched_set_locked(vm, cr); } static void ractor_sched_unlock_(rb_vm_t *vm, rb_ractor_t *cr, const char *file, int line) { RUBY_DEBUG_LOG2(file, line, "cr:%u", rb_ractor_serial(cr)); ractor_sched_set_unlocked(vm, cr); rb_native_mutex_unlock(&vm->ractor.sched.lock); } static void ASSERT_ractor_sched_locked(rb_vm_t *vm, rb_ractor_t *cr) { VM_ASSERT(rb_native_mutex_trylock(&vm->ractor.sched.lock) == EBUSY); VM_ASSERT(vm->ractor.sched.locked); VM_ASSERT(cr == NULL || vm->ractor.sched.lock_owner == cr); } RBIMPL_ATTR_MAYBE_UNUSED() static bool ractor_sched_running_threads_contain_p(rb_vm_t *vm, rb_thread_t *th) { rb_thread_t *rth; ccan_list_for_each(&vm->ractor.sched.running_threads, rth, sched.node.running_threads) { if (rth == th) return true; } return false; } RBIMPL_ATTR_MAYBE_UNUSED() static unsigned int ractor_sched_running_threads_size(rb_vm_t *vm) { rb_thread_t *th; unsigned int i = 0; ccan_list_for_each(&vm->ractor.sched.running_threads, th, sched.node.running_threads) { i++; } return i; } RBIMPL_ATTR_MAYBE_UNUSED() static unsigned int ractor_sched_timeslice_threads_size(rb_vm_t *vm) { rb_thread_t *th; unsigned int i = 0; ccan_list_for_each(&vm->ractor.sched.timeslice_threads, th, sched.node.timeslice_threads) { i++; } return i; } RBIMPL_ATTR_MAYBE_UNUSED() static bool ractor_sched_timeslice_threads_contain_p(rb_vm_t *vm, rb_thread_t *th) { rb_thread_t *rth; ccan_list_for_each(&vm->ractor.sched.timeslice_threads, rth, sched.node.timeslice_threads) { if (rth == th) return true; } return false; } static void ractor_sched_barrier_join_signal_locked(rb_vm_t *vm); static void ractor_sched_barrier_join_wait_locked(rb_vm_t *vm, rb_thread_t *th); // setup timeslice signals by the timer thread. static void thread_sched_setup_running_threads(struct rb_thread_sched *sched, rb_ractor_t *cr, rb_vm_t *vm, rb_thread_t *add_th, rb_thread_t *del_th, rb_thread_t *add_timeslice_th) { #if USE_RUBY_DEBUG_LOG unsigned int prev_running_cnt = vm->ractor.sched.running_cnt; #endif rb_thread_t *del_timeslice_th; if (del_th && sched->is_running_timeslice) { del_timeslice_th = del_th; sched->is_running_timeslice = false; } else { del_timeslice_th = NULL; } RUBY_DEBUG_LOG("+:%u -:%u +ts:%u -ts:%u", rb_th_serial(add_th), rb_th_serial(del_th), rb_th_serial(add_timeslice_th), rb_th_serial(del_timeslice_th)); ractor_sched_lock(vm, cr); { // update running_threads if (del_th) { VM_ASSERT(ractor_sched_running_threads_contain_p(vm, del_th)); VM_ASSERT(del_timeslice_th != NULL || !ractor_sched_timeslice_threads_contain_p(vm, del_th)); ccan_list_del_init(&del_th->sched.node.running_threads); vm->ractor.sched.running_cnt--; if (UNLIKELY(vm->ractor.sched.barrier_waiting)) { ractor_sched_barrier_join_signal_locked(vm); } sched->is_running = false; } if (add_th) { if (UNLIKELY(vm->ractor.sched.barrier_waiting)) { RUBY_DEBUG_LOG("barrier-wait"); ractor_sched_barrier_join_signal_locked(vm); ractor_sched_barrier_join_wait_locked(vm, add_th); } VM_ASSERT(!ractor_sched_running_threads_contain_p(vm, add_th)); VM_ASSERT(!ractor_sched_timeslice_threads_contain_p(vm, add_th)); ccan_list_add(&vm->ractor.sched.running_threads, &add_th->sched.node.running_threads); vm->ractor.sched.running_cnt++; sched->is_running = true; } if (add_timeslice_th) { // update timeslice threads int was_empty = ccan_list_empty(&vm->ractor.sched.timeslice_threads); VM_ASSERT(!ractor_sched_timeslice_threads_contain_p(vm, add_timeslice_th)); ccan_list_add(&vm->ractor.sched.timeslice_threads, &add_timeslice_th->sched.node.timeslice_threads); sched->is_running_timeslice = true; if (was_empty) { timer_thread_wakeup_locked(vm); } } if (del_timeslice_th) { VM_ASSERT(ractor_sched_timeslice_threads_contain_p(vm, del_timeslice_th)); ccan_list_del_init(&del_timeslice_th->sched.node.timeslice_threads); } VM_ASSERT(ractor_sched_running_threads_size(vm) == vm->ractor.sched.running_cnt); VM_ASSERT(ractor_sched_timeslice_threads_size(vm) <= vm->ractor.sched.running_cnt); } ractor_sched_unlock(vm, cr); if (add_th && !del_th && UNLIKELY(vm->ractor.sync.lock_owner != NULL)) { // it can be after barrier synchronization by another ractor rb_thread_t *lock_owner = NULL; #if VM_CHECK_MODE lock_owner = sched->lock_owner; #endif thread_sched_unlock(sched, lock_owner); { RB_VM_LOCK_ENTER(); RB_VM_LOCK_LEAVE(); } thread_sched_lock(sched, lock_owner); } //RUBY_DEBUG_LOG("+:%u -:%u +ts:%u -ts:%u run:%u->%u", // rb_th_serial(add_th), rb_th_serial(del_th), // rb_th_serial(add_timeslice_th), rb_th_serial(del_timeslice_th), RUBY_DEBUG_LOG("run:%u->%u", prev_running_cnt, vm->ractor.sched.running_cnt); } static void thread_sched_add_running_thread(struct rb_thread_sched *sched, rb_thread_t *th) { ASSERT_thread_sched_locked(sched, th); VM_ASSERT(sched->running == th); rb_vm_t *vm = th->vm; thread_sched_setup_running_threads(sched, th->ractor, vm, th, NULL, ccan_list_empty(&sched->readyq) ? NULL : th); } static void thread_sched_del_running_thread(struct rb_thread_sched *sched, rb_thread_t *th) { ASSERT_thread_sched_locked(sched, th); rb_vm_t *vm = th->vm; thread_sched_setup_running_threads(sched, th->ractor, vm, NULL, th, NULL); } void rb_add_running_thread(rb_thread_t *th) { struct rb_thread_sched *sched = TH_SCHED(th); thread_sched_lock(sched, th); { thread_sched_add_running_thread(sched, th); } thread_sched_unlock(sched, th); } void rb_del_running_thread(rb_thread_t *th) { struct rb_thread_sched *sched = TH_SCHED(th); thread_sched_lock(sched, th); { thread_sched_del_running_thread(sched, th); } thread_sched_unlock(sched, th); } // setup current or next running thread // sched->running should be set only on this function. // // if th is NULL, there is no running threads. static void thread_sched_set_running(struct rb_thread_sched *sched, rb_thread_t *th) { RUBY_DEBUG_LOG("th:%u->th:%u", rb_th_serial(sched->running), rb_th_serial(th)); VM_ASSERT(sched->running != th); sched->running = th; } RBIMPL_ATTR_MAYBE_UNUSED() static bool thread_sched_readyq_contain_p(struct rb_thread_sched *sched, rb_thread_t *th) { rb_thread_t *rth; ccan_list_for_each(&sched->readyq, rth, sched.node.readyq) { if (rth == th) return true; } return false; } // deque thread from the ready queue. // if the ready queue is empty, return NULL. // // return deque'ed running thread (or NULL). static rb_thread_t * thread_sched_deq(struct rb_thread_sched *sched) { ASSERT_thread_sched_locked(sched, NULL); rb_thread_t *next_th; VM_ASSERT(sched->running != NULL); if (ccan_list_empty(&sched->readyq)) { next_th = NULL; } else { next_th = ccan_list_pop(&sched->readyq, rb_thread_t, sched.node.readyq); VM_ASSERT(sched->readyq_cnt > 0); sched->readyq_cnt--; ccan_list_node_init(&next_th->sched.node.readyq); } RUBY_DEBUG_LOG("next_th:%u readyq_cnt:%d", rb_th_serial(next_th), sched->readyq_cnt); return next_th; } // enqueue ready thread to the ready queue. static void thread_sched_enq(struct rb_thread_sched *sched, rb_thread_t *ready_th) { ASSERT_thread_sched_locked(sched, NULL); RUBY_DEBUG_LOG("ready_th:%u readyq_cnt:%d", rb_th_serial(ready_th), sched->readyq_cnt); VM_ASSERT(sched->running != NULL); VM_ASSERT(!thread_sched_readyq_contain_p(sched, ready_th)); if (sched->is_running) { if (ccan_list_empty(&sched->readyq)) { // add sched->running to timeslice thread_sched_setup_running_threads(sched, ready_th->ractor, ready_th->vm, NULL, NULL, sched->running); } } else { VM_ASSERT(!ractor_sched_timeslice_threads_contain_p(ready_th->vm, sched->running)); } ccan_list_add_tail(&sched->readyq, &ready_th->sched.node.readyq); sched->readyq_cnt++; } // DNT: kick condvar // SNT: TODO static void thread_sched_wakeup_running_thread(struct rb_thread_sched *sched, rb_thread_t *next_th, bool will_switch) { ASSERT_thread_sched_locked(sched, NULL); VM_ASSERT(sched->running == next_th); if (next_th) { if (next_th->nt) { if (th_has_dedicated_nt(next_th)) { RUBY_DEBUG_LOG("pinning th:%u", next_th->serial); rb_native_cond_signal(&next_th->nt->cond.readyq); } else { // TODO RUBY_DEBUG_LOG("th:%u is already running.", next_th->serial); } } else { if (will_switch) { RUBY_DEBUG_LOG("th:%u (do nothing)", rb_th_serial(next_th)); } else { RUBY_DEBUG_LOG("th:%u (enq)", rb_th_serial(next_th)); ractor_sched_enq(next_th->vm, next_th->ractor); } } } else { RUBY_DEBUG_LOG("no waiting threads%s", ""); } } // waiting -> ready (locked) static void thread_sched_to_ready_common(struct rb_thread_sched *sched, rb_thread_t *th, bool wakeup, bool will_switch) { RUBY_DEBUG_LOG("th:%u running:%u redyq_cnt:%d", rb_th_serial(th), rb_th_serial(sched->running), sched->readyq_cnt); VM_ASSERT(sched->running != th); VM_ASSERT(!thread_sched_readyq_contain_p(sched, th)); RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_READY, th); if (sched->running == NULL) { thread_sched_set_running(sched, th); if (wakeup) thread_sched_wakeup_running_thread(sched, th, will_switch); } else { thread_sched_enq(sched, th); } } // waiting -> ready // // `th` had became "waiting" state by `thread_sched_to_waiting` // and `thread_sched_to_ready` enqueue `th` to the thread ready queue. RBIMPL_ATTR_MAYBE_UNUSED() static void thread_sched_to_ready(struct rb_thread_sched *sched, rb_thread_t *th) { RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); thread_sched_lock(sched, th); { thread_sched_to_ready_common(sched, th, true, false); } thread_sched_unlock(sched, th); } // wait until sched->running is `th`. static void thread_sched_wait_running_turn(struct rb_thread_sched *sched, rb_thread_t *th, bool can_direct_transfer) { RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); ASSERT_thread_sched_locked(sched, th); VM_ASSERT(th == GET_THREAD()); if (th != sched->running) { // already deleted from running threads // VM_ASSERT(!ractor_sched_running_threads_contain_p(th->vm, th)); // need locking // wait for execution right rb_thread_t *next_th; while((next_th = sched->running) != th) { if (th_has_dedicated_nt(th)) { RUBY_DEBUG_LOG("(nt) sleep th:%u running:%u", rb_th_serial(th), rb_th_serial(sched->running)); thread_sched_set_lock_owner(sched, NULL); { RUBY_DEBUG_LOG("nt:%d cond:%p", th->nt->serial, &th->nt->cond.readyq); rb_native_cond_wait(&th->nt->cond.readyq, &sched->lock_); } thread_sched_set_lock_owner(sched, th); RUBY_DEBUG_LOG("(nt) wakeup %s", sched->running == th ? "success" : "failed"); if (th == sched->running) { rb_ractor_thread_switch(th->ractor, th); } } else { // search another ready thread if (can_direct_transfer && (next_th = sched->running) != NULL && !next_th->nt // next_th is running or has dedicated nt ) { RUBY_DEBUG_LOG("th:%u->%u (direct)", rb_th_serial(th), rb_th_serial(next_th)); thread_sched_set_lock_owner(sched, NULL); { rb_ractor_set_current_ec(th->ractor, NULL); thread_sched_switch(th, next_th); } thread_sched_set_lock_owner(sched, th); } else { // search another ready ractor struct rb_native_thread *nt = th->nt; native_thread_assign(NULL, th); RUBY_DEBUG_LOG("th:%u->%u (ractor scheduling)", rb_th_serial(th), rb_th_serial(next_th)); thread_sched_set_lock_owner(sched, NULL); { rb_ractor_set_current_ec(th->ractor, NULL); coroutine_transfer(th->sched.context, nt->nt_context); } thread_sched_set_lock_owner(sched, th); } VM_ASSERT(GET_EC() == th->ec); } } VM_ASSERT(th->nt != NULL); VM_ASSERT(GET_EC() == th->ec); VM_ASSERT(th->sched.waiting_reason.flags == thread_sched_waiting_none); // add th to running threads thread_sched_add_running_thread(sched, th); } // VM_ASSERT(ractor_sched_running_threads_contain_p(th->vm, th)); need locking RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_RESUMED, th); } // waiting -> ready -> running (locked) static void thread_sched_to_running_common(struct rb_thread_sched *sched, rb_thread_t *th) { RUBY_DEBUG_LOG("th:%u dedicated:%d", rb_th_serial(th), th_has_dedicated_nt(th)); VM_ASSERT(sched->running != th); VM_ASSERT(th_has_dedicated_nt(th)); VM_ASSERT(GET_THREAD() == th); native_thread_dedicated_dec(th->vm, th->ractor, th->nt); // waiting -> ready thread_sched_to_ready_common(sched, th, false, false); if (sched->running == th) { thread_sched_add_running_thread(sched, th); } // TODO: check SNT number thread_sched_wait_running_turn(sched, th, false); } // waiting -> ready -> running // // `th` had been waiting by `thread_sched_to_waiting()` // and run a dedicated task (like waitpid and so on). // After the dedicated task, this function is called // to join a normal thread-scheduling. static void thread_sched_to_running(struct rb_thread_sched *sched, rb_thread_t *th) { thread_sched_lock(sched, th); { thread_sched_to_running_common(sched, th); } thread_sched_unlock(sched, th); } // resume a next thread in the thread ready queue. // // deque next running thread from the ready thread queue and // resume this thread if available. // // If the next therad has a dedicated native thraed, simply signal to resume. // Otherwise, make the ractor ready and other nt will run the ractor and the thread. static void thread_sched_wakeup_next_thread(struct rb_thread_sched *sched, rb_thread_t *th, bool will_switch) { ASSERT_thread_sched_locked(sched, th); VM_ASSERT(sched->running == th); VM_ASSERT(sched->running->nt != NULL); rb_thread_t *next_th = thread_sched_deq(sched); RUBY_DEBUG_LOG("next_th:%u", rb_th_serial(next_th)); VM_ASSERT(th != next_th); thread_sched_set_running(sched, next_th); VM_ASSERT(next_th == sched->running); thread_sched_wakeup_running_thread(sched, next_th, will_switch); if (th != next_th) { thread_sched_del_running_thread(sched, th); } } // running -> waiting // // to_dead: false // th will run dedicated task. // run another ready thread. // to_dead: true // th will be dead. // run another ready thread. static void thread_sched_to_waiting_common0(struct rb_thread_sched *sched, rb_thread_t *th, bool to_dead) { RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th); if (!to_dead) native_thread_dedicated_inc(th->vm, th->ractor, th->nt); RUBY_DEBUG_LOG("%sth:%u", to_dead ? "to_dead " : "", rb_th_serial(th)); bool can_switch = to_dead ? !th_has_dedicated_nt(th) : false; thread_sched_wakeup_next_thread(sched, th, can_switch); } // running -> dead (locked) static void thread_sched_to_dead_common(struct rb_thread_sched *sched, rb_thread_t *th) { RUBY_DEBUG_LOG("dedicated:%d", th->nt->dedicated); thread_sched_to_waiting_common0(sched, th, true); RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_EXITED, th); } // running -> dead static void thread_sched_to_dead(struct rb_thread_sched *sched, rb_thread_t *th) { thread_sched_lock(sched, th); { thread_sched_to_dead_common(sched, th); } thread_sched_unlock(sched, th); } // running -> waiting (locked) // // This thread will run dedicated task (th->nt->dedicated++). static void thread_sched_to_waiting_common(struct rb_thread_sched *sched, rb_thread_t *th) { RUBY_DEBUG_LOG("dedicated:%d", th->nt->dedicated); thread_sched_to_waiting_common0(sched, th, false); } // running -> waiting // // This thread will run a dedicated task. static void thread_sched_to_waiting(struct rb_thread_sched *sched, rb_thread_t *th) { thread_sched_lock(sched, th); { thread_sched_to_waiting_common(sched, th); } thread_sched_unlock(sched, th); } // mini utility func static void setup_ubf(rb_thread_t *th, rb_unblock_function_t *func, void *arg) { rb_native_mutex_lock(&th->interrupt_lock); { th->unblock.func = func; th->unblock.arg = arg; } rb_native_mutex_unlock(&th->interrupt_lock); } static void ubf_waiting(void *ptr) { rb_thread_t *th = (rb_thread_t *)ptr; struct rb_thread_sched *sched = TH_SCHED(th); // only once. it is safe because th->interrupt_lock is already acquired. th->unblock.func = NULL; th->unblock.arg = NULL; RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); thread_sched_lock(sched, th); { if (sched->running == th) { // not sleeping yet. } else { thread_sched_to_ready_common(sched, th, true, false); } } thread_sched_unlock(sched, th); } // running -> waiting // // This thread will sleep until other thread wakeup the thread. static void thread_sched_to_waiting_until_wakeup(struct rb_thread_sched *sched, rb_thread_t *th) { RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); RB_VM_SAVE_MACHINE_CONTEXT(th); setup_ubf(th, ubf_waiting, (void *)th); RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th); thread_sched_lock(sched, th); { if (!RUBY_VM_INTERRUPTED(th->ec)) { bool can_direct_transfer = !th_has_dedicated_nt(th); thread_sched_wakeup_next_thread(sched, th, can_direct_transfer); thread_sched_wait_running_turn(sched, th, can_direct_transfer); } else { RUBY_DEBUG_LOG("th:%u interrupted", rb_th_serial(th)); } } thread_sched_unlock(sched, th); setup_ubf(th, NULL, NULL); } // run another thread in the ready queue. // continue to run if there are no ready threads. static void thread_sched_yield(struct rb_thread_sched *sched, rb_thread_t *th) { RUBY_DEBUG_LOG("th:%d sched->readyq_cnt:%d", (int)th->serial, sched->readyq_cnt); thread_sched_lock(sched, th); { if (!ccan_list_empty(&sched->readyq)) { RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th); thread_sched_wakeup_next_thread(sched, th, !th_has_dedicated_nt(th)); bool can_direct_transfer = !th_has_dedicated_nt(th); thread_sched_to_ready_common(sched, th, false, can_direct_transfer); thread_sched_wait_running_turn(sched, th, can_direct_transfer); } else { VM_ASSERT(sched->readyq_cnt == 0); } } thread_sched_unlock(sched, th); } void rb_thread_sched_init(struct rb_thread_sched *sched, bool atfork) { rb_native_mutex_initialize(&sched->lock_); #if VM_CHECK_MODE sched->lock_owner = NULL; #endif ccan_list_head_init(&sched->readyq); sched->readyq_cnt = 0; #if USE_MN_THREADS if (!atfork) sched->enable_mn_threads = true; // MN is enabled on Ractors #endif } static void thread_sched_switch0(struct coroutine_context *current_cont, rb_thread_t *next_th, struct rb_native_thread *nt) { VM_ASSERT(!nt->dedicated); VM_ASSERT(next_th->nt == NULL); RUBY_DEBUG_LOG("next_th:%u", rb_th_serial(next_th)); ruby_thread_set_native(next_th); native_thread_assign(nt, next_th); coroutine_transfer(current_cont, next_th->sched.context); } static void thread_sched_switch(rb_thread_t *cth, rb_thread_t *next_th) { struct rb_native_thread *nt = cth->nt; native_thread_assign(NULL, cth); RUBY_DEBUG_LOG("th:%u->%u on nt:%d", rb_th_serial(cth), rb_th_serial(next_th), nt->serial); thread_sched_switch0(cth->sched.context, next_th, nt); } #if VM_CHECK_MODE > 0 RBIMPL_ATTR_MAYBE_UNUSED() static unsigned int grq_size(rb_vm_t *vm, rb_ractor_t *cr) { ASSERT_ractor_sched_locked(vm, cr); rb_ractor_t *r, *prev_r = NULL; unsigned int i = 0; ccan_list_for_each(&vm->ractor.sched.grq, r, threads.sched.grq_node) { i++; VM_ASSERT(r != prev_r); prev_r = r; } return i; } #endif static void ractor_sched_enq(rb_vm_t *vm, rb_ractor_t *r) { struct rb_thread_sched *sched = &r->threads.sched; rb_ractor_t *cr = NULL; // timer thread can call this function VM_ASSERT(sched->running != NULL); VM_ASSERT(sched->running->nt == NULL); ractor_sched_lock(vm, cr); { #if VM_CHECK_MODE > 0 // check if grq contains r rb_ractor_t *tr; ccan_list_for_each(&vm->ractor.sched.grq, tr, threads.sched.grq_node) { VM_ASSERT(r != tr); } #endif ccan_list_add_tail(&vm->ractor.sched.grq, &sched->grq_node); vm->ractor.sched.grq_cnt++; VM_ASSERT(grq_size(vm, cr) == vm->ractor.sched.grq_cnt); RUBY_DEBUG_LOG("r:%u th:%u grq_cnt:%u", rb_ractor_id(r), rb_th_serial(sched->running), vm->ractor.sched.grq_cnt); rb_native_cond_signal(&vm->ractor.sched.cond); // ractor_sched_dump(vm); } ractor_sched_unlock(vm, cr); } #ifndef SNT_KEEP_SECONDS #define SNT_KEEP_SECONDS 0 #endif #ifndef MINIMUM_SNT // make at least MINIMUM_SNT snts for debug. #define MINIMUM_SNT 0 #endif static rb_ractor_t * ractor_sched_deq(rb_vm_t *vm, rb_ractor_t *cr) { rb_ractor_t *r; ractor_sched_lock(vm, cr); { RUBY_DEBUG_LOG("empty? %d", ccan_list_empty(&vm->ractor.sched.grq)); // ractor_sched_dump(vm); VM_ASSERT(rb_current_execution_context(false) == NULL); VM_ASSERT(grq_size(vm, cr) == vm->ractor.sched.grq_cnt); while ((r = ccan_list_pop(&vm->ractor.sched.grq, rb_ractor_t, threads.sched.grq_node)) == NULL) { RUBY_DEBUG_LOG("wait grq_cnt:%d", (int)vm->ractor.sched.grq_cnt); #if SNT_KEEP_SECONDS > 0 rb_hrtime_t abs = rb_hrtime_add(rb_hrtime_now(), RB_HRTIME_PER_SEC * SNT_KEEP_SECONDS); if (native_cond_timedwait(&vm->ractor.sched.cond, &vm->ractor.sched.lock, &abs) == ETIMEDOUT) { RUBY_DEBUG_LOG("timeout, grq_cnt:%d", (int)vm->ractor.sched.grq_cnt); VM_ASSERT(r == NULL); vm->ractor.sched.snt_cnt--; vm->ractor.sched.running_cnt--; break; } else { RUBY_DEBUG_LOG("wakeup grq_cnt:%d", (int)vm->ractor.sched.grq_cnt); } #else ractor_sched_set_unlocked(vm, cr); rb_native_cond_wait(&vm->ractor.sched.cond, &vm->ractor.sched.lock); ractor_sched_set_locked(vm, cr); RUBY_DEBUG_LOG("wakeup grq_cnt:%d", (int)vm->ractor.sched.grq_cnt); #endif } VM_ASSERT(rb_current_execution_context(false) == NULL); if (r) { VM_ASSERT(vm->ractor.sched.grq_cnt > 0); vm->ractor.sched.grq_cnt--; RUBY_DEBUG_LOG("r:%d grq_cnt:%u", (int)rb_ractor_id(r), vm->ractor.sched.grq_cnt); } else { VM_ASSERT(SNT_KEEP_SECONDS > 0); // timeout } } ractor_sched_unlock(vm, cr); return r; } void rb_ractor_lock_self(rb_ractor_t *r); void rb_ractor_unlock_self(rb_ractor_t *r); void rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf) { // ractor lock of cr is acquired // r is sleeping statuss rb_thread_t *th = rb_ec_thread_ptr(ec); struct rb_thread_sched *sched = TH_SCHED(th); cr->sync.wait.waiting_thread = th; // TODO: multi-thread setup_ubf(th, ubf, (void *)cr); thread_sched_lock(sched, th); { rb_ractor_unlock_self(cr); { if (RUBY_VM_INTERRUPTED(th->ec)) { RUBY_DEBUG_LOG("interrupted"); } else if (cr->sync.wait.wakeup_status != wakeup_none) { RUBY_DEBUG_LOG("awaken:%d", (int)cr->sync.wait.wakeup_status); } else { // sleep RB_VM_SAVE_MACHINE_CONTEXT(th); th->status = THREAD_STOPPED_FOREVER; RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th); bool can_direct_transfer = !th_has_dedicated_nt(th); thread_sched_wakeup_next_thread(sched, th, can_direct_transfer); thread_sched_wait_running_turn(sched, th, can_direct_transfer); th->status = THREAD_RUNNABLE; // wakeup } } } thread_sched_unlock(sched, th); setup_ubf(th, NULL, NULL); rb_ractor_lock_self(cr); cr->sync.wait.waiting_thread = NULL; } void rb_ractor_sched_wakeup(rb_ractor_t *r) { rb_thread_t *r_th = r->sync.wait.waiting_thread; // ractor lock of r is acquired struct rb_thread_sched *sched = TH_SCHED(r_th); VM_ASSERT(r->sync.wait.wakeup_status != 0); thread_sched_lock(sched, r_th); { if (r_th->status == THREAD_STOPPED_FOREVER) { thread_sched_to_ready_common(sched, r_th, true, false); } } thread_sched_unlock(sched, r_th); } static bool ractor_sched_barrier_completed_p(rb_vm_t *vm) { RUBY_DEBUG_LOG("run:%u wait:%u", vm->ractor.sched.running_cnt, vm->ractor.sched.barrier_waiting_cnt); VM_ASSERT(vm->ractor.sched.running_cnt - 1 >= vm->ractor.sched.barrier_waiting_cnt); return (vm->ractor.sched.running_cnt - vm->ractor.sched.barrier_waiting_cnt) == 1; } void rb_ractor_sched_barrier_start(rb_vm_t *vm, rb_ractor_t *cr) { VM_ASSERT(cr == GET_RACTOR()); VM_ASSERT(vm->ractor.sync.lock_owner == cr); // VM is locked VM_ASSERT(!vm->ractor.sched.barrier_waiting); VM_ASSERT(vm->ractor.sched.barrier_waiting_cnt == 0); RUBY_DEBUG_LOG("start serial:%u", vm->ractor.sched.barrier_serial); unsigned int lock_rec; ractor_sched_lock(vm, cr); { vm->ractor.sched.barrier_waiting = true; // release VM lock lock_rec = vm->ractor.sync.lock_rec; vm->ractor.sync.lock_rec = 0; vm->ractor.sync.lock_owner = NULL; rb_native_mutex_unlock(&vm->ractor.sync.lock); { // interrupts all running threads rb_thread_t *ith; ccan_list_for_each(&vm->ractor.sched.running_threads, ith, sched.node.running_threads) { if (ith->ractor != cr) { RUBY_DEBUG_LOG("barrier int:%u", rb_th_serial(ith)); RUBY_VM_SET_VM_BARRIER_INTERRUPT(ith->ec); } } // wait for other ractors while (!ractor_sched_barrier_completed_p(vm)) { ractor_sched_set_unlocked(vm, cr); rb_native_cond_wait(&vm->ractor.sched.barrier_complete_cond, &vm->ractor.sched.lock); ractor_sched_set_locked(vm, cr); } } } ractor_sched_unlock(vm, cr); // acquire VM lock rb_native_mutex_lock(&vm->ractor.sync.lock); vm->ractor.sync.lock_rec = lock_rec; vm->ractor.sync.lock_owner = cr; RUBY_DEBUG_LOG("completed seirial:%u", vm->ractor.sched.barrier_serial); ractor_sched_lock(vm, cr); { vm->ractor.sched.barrier_waiting = false; vm->ractor.sched.barrier_serial++; vm->ractor.sched.barrier_waiting_cnt = 0; rb_native_cond_broadcast(&vm->ractor.sched.barrier_release_cond); } ractor_sched_unlock(vm, cr); } static void ractor_sched_barrier_join_signal_locked(rb_vm_t *vm) { if (ractor_sched_barrier_completed_p(vm)) { rb_native_cond_signal(&vm->ractor.sched.barrier_complete_cond); } } static void ractor_sched_barrier_join_wait_locked(rb_vm_t *vm, rb_thread_t *th) { VM_ASSERT(vm->ractor.sched.barrier_waiting); unsigned int barrier_serial = vm->ractor.sched.barrier_serial; while (vm->ractor.sched.barrier_serial == barrier_serial) { RUBY_DEBUG_LOG("sleep serial:%u", barrier_serial); RB_VM_SAVE_MACHINE_CONTEXT(th); rb_ractor_t *cr = th->ractor; ractor_sched_set_unlocked(vm, cr); rb_native_cond_wait(&vm->ractor.sched.barrier_release_cond, &vm->ractor.sched.lock); ractor_sched_set_locked(vm, cr); RUBY_DEBUG_LOG("wakeup serial:%u", barrier_serial); } } void rb_ractor_sched_barrier_join(rb_vm_t *vm, rb_ractor_t *cr) { VM_ASSERT(cr->threads.sched.running != NULL); // running ractor VM_ASSERT(cr == GET_RACTOR()); VM_ASSERT(vm->ractor.sync.lock_owner == NULL); // VM is locked, but owner == NULL VM_ASSERT(vm->ractor.sched.barrier_waiting); // VM needs barrier sync #if USE_RUBY_DEBUG_LOG || VM_CHECK_MODE > 0 unsigned int barrier_serial = vm->ractor.sched.barrier_serial; #endif RUBY_DEBUG_LOG("join"); rb_native_mutex_unlock(&vm->ractor.sync.lock); { VM_ASSERT(vm->ractor.sched.barrier_waiting); // VM needs barrier sync VM_ASSERT(vm->ractor.sched.barrier_serial == barrier_serial); ractor_sched_lock(vm, cr); { // running_cnt vm->ractor.sched.barrier_waiting_cnt++; RUBY_DEBUG_LOG("waiting_cnt:%u serial:%u", vm->ractor.sched.barrier_waiting_cnt, barrier_serial); ractor_sched_barrier_join_signal_locked(vm); ractor_sched_barrier_join_wait_locked(vm, cr->threads.sched.running); } ractor_sched_unlock(vm, cr); } rb_native_mutex_lock(&vm->ractor.sync.lock); // VM locked here } #if 0 // TODO static void clear_thread_cache_altstack(void); static void rb_thread_sched_destroy(struct rb_thread_sched *sched) { /* * only called once at VM shutdown (not atfork), another thread * may still grab vm->gvl.lock when calling gvl_release at * the end of thread_start_func_2 */ if (0) { rb_native_mutex_destroy(&sched->lock); } clear_thread_cache_altstack(); } #endif #ifdef RB_THREAD_T_HAS_NATIVE_ID static int get_native_thread_id(void) { #ifdef __linux__ return (int)syscall(SYS_gettid); #elif defined(__FreeBSD__) return pthread_getthreadid_np(); #endif } #endif #if defined(HAVE_WORKING_FORK) static void thread_sched_atfork(struct rb_thread_sched *sched) { current_fork_gen++; rb_thread_sched_init(sched, true); rb_thread_t *th = GET_THREAD(); rb_vm_t *vm = GET_VM(); if (th_has_dedicated_nt(th)) { vm->ractor.sched.snt_cnt = 0; } else { vm->ractor.sched.snt_cnt = 1; } vm->ractor.sched.running_cnt = 0; // rb_native_cond_destroy(&vm->ractor.sched.cond); rb_native_cond_initialize(&vm->ractor.sched.cond); rb_native_cond_initialize(&vm->ractor.sched.barrier_complete_cond); rb_native_cond_initialize(&vm->ractor.sched.barrier_release_cond); ccan_list_head_init(&vm->ractor.sched.grq); ccan_list_head_init(&vm->ractor.sched.timeslice_threads); ccan_list_head_init(&vm->ractor.sched.running_threads); VM_ASSERT(sched->is_running); sched->is_running_timeslice = false; if (sched->running != th) { thread_sched_to_running(sched, th); } else { thread_sched_setup_running_threads(sched, th->ractor, vm, th, NULL, NULL); } #ifdef RB_THREAD_T_HAS_NATIVE_ID if (th->nt) { th->nt->tid = get_native_thread_id(); } #endif } #endif #ifdef RB_THREAD_LOCAL_SPECIFIER static RB_THREAD_LOCAL_SPECIFIER rb_thread_t *ruby_native_thread; #else static pthread_key_t ruby_native_thread_key; #endif static void null_func(int i) { /* null */ // This function can be called from signal handler // RUBY_DEBUG_LOG("i:%d", i); } rb_thread_t * ruby_thread_from_native(void) { #ifdef RB_THREAD_LOCAL_SPECIFIER return ruby_native_thread; #else return pthread_getspecific(ruby_native_thread_key); #endif } int ruby_thread_set_native(rb_thread_t *th) { if (th) { #ifdef USE_UBF_LIST ccan_list_node_init(&th->sched.node.ubf); #endif } // setup TLS if (th && th->ec) { rb_ractor_set_current_ec(th->ractor, th->ec); } #ifdef RB_THREAD_LOCAL_SPECIFIER ruby_native_thread = th; return 1; #else return pthread_setspecific(ruby_native_thread_key, th) == 0; #endif } static void native_thread_setup(struct rb_native_thread *nt); static void native_thread_setup_on_thread(struct rb_native_thread *nt); void Init_native_thread(rb_thread_t *main_th) { #if defined(HAVE_PTHREAD_CONDATTR_SETCLOCK) if (condattr_monotonic) { int r = pthread_condattr_init(condattr_monotonic); if (r == 0) { r = pthread_condattr_setclock(condattr_monotonic, CLOCK_MONOTONIC); } if (r) condattr_monotonic = NULL; } #endif #ifndef RB_THREAD_LOCAL_SPECIFIER if (pthread_key_create(&ruby_native_thread_key, 0) == EAGAIN) { rb_bug("pthread_key_create failed (ruby_native_thread_key)"); } if (pthread_key_create(&ruby_current_ec_key, 0) == EAGAIN) { rb_bug("pthread_key_create failed (ruby_current_ec_key)"); } #endif ruby_posix_signal(SIGVTALRM, null_func); // setup vm rb_vm_t *vm = main_th->vm; rb_native_mutex_initialize(&vm->ractor.sched.lock); rb_native_cond_initialize(&vm->ractor.sched.cond); rb_native_cond_initialize(&vm->ractor.sched.barrier_complete_cond); rb_native_cond_initialize(&vm->ractor.sched.barrier_release_cond); ccan_list_head_init(&vm->ractor.sched.grq); ccan_list_head_init(&vm->ractor.sched.timeslice_threads); ccan_list_head_init(&vm->ractor.sched.running_threads); // setup main thread main_th->nt->thread_id = pthread_self(); main_th->nt->serial = 1; #ifdef RUBY_NT_SERIAL ruby_nt_serial = 1; #endif ruby_thread_set_native(main_th); native_thread_setup(main_th->nt); native_thread_setup_on_thread(main_th->nt); TH_SCHED(main_th)->running = main_th; main_th->has_dedicated_nt = 1; thread_sched_setup_running_threads(TH_SCHED(main_th), main_th->ractor, vm, main_th, NULL, NULL); // setup main NT main_th->nt->dedicated = 1; main_th->nt->vm = vm; // setup mn vm->ractor.sched.dnt_cnt = 1; } extern int ruby_mn_threads_enabled; void ruby_mn_threads_params(void) { rb_vm_t *vm = GET_VM(); rb_ractor_t *main_ractor = GET_RACTOR(); const char *mn_threads_cstr = getenv("RUBY_MN_THREADS"); bool enable_mn_threads = false; if (USE_MN_THREADS && mn_threads_cstr && (enable_mn_threads = atoi(mn_threads_cstr) > 0)) { // enabled ruby_mn_threads_enabled = 1; } main_ractor->threads.sched.enable_mn_threads = enable_mn_threads; const char *max_cpu_cstr = getenv("RUBY_MAX_CPU"); const int default_max_cpu = 8; // TODO: CPU num? int max_cpu = default_max_cpu; if (USE_MN_THREADS && max_cpu_cstr && (max_cpu = atoi(max_cpu_cstr)) > 0) { max_cpu = default_max_cpu; } vm->ractor.sched.max_cpu = max_cpu; } static void native_thread_dedicated_inc(rb_vm_t *vm, rb_ractor_t *cr, struct rb_native_thread *nt) { RUBY_DEBUG_LOG("nt:%d %d->%d", nt->serial, nt->dedicated, nt->dedicated + 1); if (nt->dedicated == 0) { ractor_sched_lock(vm, cr); { vm->ractor.sched.snt_cnt--; vm->ractor.sched.dnt_cnt++; } ractor_sched_unlock(vm, cr); } nt->dedicated++; } static void native_thread_dedicated_dec(rb_vm_t *vm, rb_ractor_t *cr, struct rb_native_thread *nt) { RUBY_DEBUG_LOG("nt:%d %d->%d", nt->serial, nt->dedicated, nt->dedicated - 1); VM_ASSERT(nt->dedicated > 0); nt->dedicated--; if (nt->dedicated == 0) { ractor_sched_lock(vm, cr); { nt->vm->ractor.sched.snt_cnt++; nt->vm->ractor.sched.dnt_cnt--; } ractor_sched_unlock(vm, cr); } } static void native_thread_assign(struct rb_native_thread *nt, rb_thread_t *th) { #if USE_RUBY_DEBUG_LOG if (nt) { if (th->nt) { RUBY_DEBUG_LOG("th:%d nt:%d->%d", (int)th->serial, (int)th->nt->serial, (int)nt->serial); } else { RUBY_DEBUG_LOG("th:%d nt:NULL->%d", (int)th->serial, (int)nt->serial); } } else { if (th->nt) { RUBY_DEBUG_LOG("th:%d nt:%d->NULL", (int)th->serial, (int)th->nt->serial); } else { RUBY_DEBUG_LOG("th:%d nt:NULL->NULL", (int)th->serial); } } #endif th->nt = nt; } static void native_thread_destroy(struct rb_native_thread *nt) { if (nt) { rb_native_cond_destroy(&nt->cond.readyq); if (&nt->cond.readyq != &nt->cond.intr) { rb_native_cond_destroy(&nt->cond.intr); } RB_ALTSTACK_FREE(nt->altstack); ruby_xfree(nt->nt_context); ruby_xfree(nt); } } #if defined HAVE_PTHREAD_GETATTR_NP || defined HAVE_PTHREAD_ATTR_GET_NP #define STACKADDR_AVAILABLE 1 #elif defined HAVE_PTHREAD_GET_STACKADDR_NP && defined HAVE_PTHREAD_GET_STACKSIZE_NP #define STACKADDR_AVAILABLE 1 #undef MAINSTACKADDR_AVAILABLE #define MAINSTACKADDR_AVAILABLE 1 void *pthread_get_stackaddr_np(pthread_t); size_t pthread_get_stacksize_np(pthread_t); #elif defined HAVE_THR_STKSEGMENT || defined HAVE_PTHREAD_STACKSEG_NP #define STACKADDR_AVAILABLE 1 #elif defined HAVE_PTHREAD_GETTHRDS_NP #define STACKADDR_AVAILABLE 1 #elif defined __HAIKU__ #define STACKADDR_AVAILABLE 1 #endif #ifndef MAINSTACKADDR_AVAILABLE # ifdef STACKADDR_AVAILABLE # define MAINSTACKADDR_AVAILABLE 1 # else # define MAINSTACKADDR_AVAILABLE 0 # endif #endif #if MAINSTACKADDR_AVAILABLE && !defined(get_main_stack) # define get_main_stack(addr, size) get_stack(addr, size) #endif #ifdef STACKADDR_AVAILABLE /* * Get the initial address and size of current thread's stack */ static int get_stack(void **addr, size_t *size) { #define CHECK_ERR(expr) \ {int err = (expr); if (err) return err;} #ifdef HAVE_PTHREAD_GETATTR_NP /* Linux */ pthread_attr_t attr; size_t guard = 0; STACK_GROW_DIR_DETECTION; CHECK_ERR(pthread_getattr_np(pthread_self(), &attr)); # ifdef HAVE_PTHREAD_ATTR_GETSTACK CHECK_ERR(pthread_attr_getstack(&attr, addr, size)); STACK_DIR_UPPER((void)0, (void)(*addr = (char *)*addr + *size)); # else CHECK_ERR(pthread_attr_getstackaddr(&attr, addr)); CHECK_ERR(pthread_attr_getstacksize(&attr, size)); # endif # ifdef HAVE_PTHREAD_ATTR_GETGUARDSIZE CHECK_ERR(pthread_attr_getguardsize(&attr, &guard)); # else guard = getpagesize(); # endif *size -= guard; pthread_attr_destroy(&attr); #elif defined HAVE_PTHREAD_ATTR_GET_NP /* FreeBSD, DragonFly BSD, NetBSD */ pthread_attr_t attr; CHECK_ERR(pthread_attr_init(&attr)); CHECK_ERR(pthread_attr_get_np(pthread_self(), &attr)); # ifdef HAVE_PTHREAD_ATTR_GETSTACK CHECK_ERR(pthread_attr_getstack(&attr, addr, size)); # else CHECK_ERR(pthread_attr_getstackaddr(&attr, addr)); CHECK_ERR(pthread_attr_getstacksize(&attr, size)); # endif STACK_DIR_UPPER((void)0, (void)(*addr = (char *)*addr + *size)); pthread_attr_destroy(&attr); #elif (defined HAVE_PTHREAD_GET_STACKADDR_NP && defined HAVE_PTHREAD_GET_STACKSIZE_NP) /* MacOS X */ pthread_t th = pthread_self(); *addr = pthread_get_stackaddr_np(th); *size = pthread_get_stacksize_np(th); #elif defined HAVE_THR_STKSEGMENT || defined HAVE_PTHREAD_STACKSEG_NP stack_t stk; # if defined HAVE_THR_STKSEGMENT /* Solaris */ CHECK_ERR(thr_stksegment(&stk)); # else /* OpenBSD */ CHECK_ERR(pthread_stackseg_np(pthread_self(), &stk)); # endif *addr = stk.ss_sp; *size = stk.ss_size; #elif defined HAVE_PTHREAD_GETTHRDS_NP /* AIX */ pthread_t th = pthread_self(); struct __pthrdsinfo thinfo; char reg[256]; int regsiz=sizeof(reg); CHECK_ERR(pthread_getthrds_np(&th, PTHRDSINFO_QUERY_ALL, &thinfo, sizeof(thinfo), ®, ®siz)); *addr = thinfo.__pi_stackaddr; /* Must not use thinfo.__pi_stacksize for size. It is around 3KB smaller than the correct size calculated by thinfo.__pi_stackend - thinfo.__pi_stackaddr. */ *size = thinfo.__pi_stackend - thinfo.__pi_stackaddr; STACK_DIR_UPPER((void)0, (void)(*addr = (char *)*addr + *size)); #elif defined __HAIKU__ thread_info info; STACK_GROW_DIR_DETECTION; CHECK_ERR(get_thread_info(find_thread(NULL), &info)); *addr = info.stack_base; *size = (uintptr_t)info.stack_end - (uintptr_t)info.stack_base; STACK_DIR_UPPER((void)0, (void)(*addr = (char *)*addr + *size)); #else #error STACKADDR_AVAILABLE is defined but not implemented. #endif return 0; #undef CHECK_ERR } #endif static struct { rb_nativethread_id_t id; size_t stack_maxsize; VALUE *stack_start; } native_main_thread; #ifdef STACK_END_ADDRESS extern void *STACK_END_ADDRESS; #endif enum { RUBY_STACK_SPACE_LIMIT = 1024 * 1024, /* 1024KB */ RUBY_STACK_SPACE_RATIO = 5 }; static size_t space_size(size_t stack_size) { size_t space_size = stack_size / RUBY_STACK_SPACE_RATIO; if (space_size > RUBY_STACK_SPACE_LIMIT) { return RUBY_STACK_SPACE_LIMIT; } else { return space_size; } } #ifdef __linux__ static __attribute__((noinline)) void reserve_stack(volatile char *limit, size_t size) { # ifdef C_ALLOCA # error needs alloca() # endif struct rlimit rl; volatile char buf[0x100]; enum {stack_check_margin = 0x1000}; /* for -fstack-check */ STACK_GROW_DIR_DETECTION; if (!getrlimit(RLIMIT_STACK, &rl) && rl.rlim_cur == RLIM_INFINITY) return; if (size < stack_check_margin) return; size -= stack_check_margin; size -= sizeof(buf); /* margin */ if (IS_STACK_DIR_UPPER()) { const volatile char *end = buf + sizeof(buf); limit += size; if (limit > end) { /* |<-bottom (=limit(a)) top->| * | .. |<-buf 256B |<-end | stack check | * | 256B | =size= | margin (4KB)| * | =size= limit(b)->| 256B | | * | | alloca(sz) | | | * | .. |<-buf |<-limit(c) [sz-1]->0> | | */ size_t sz = limit - end; limit = alloca(sz); limit[sz-1] = 0; } } else { limit -= size; if (buf > limit) { /* |<-top (=limit(a)) bottom->| * | .. | 256B buf->| | stack check | * | 256B | =size= | margin (4KB)| * | =size= limit(b)->| 256B | | * | | alloca(sz) | | | * | .. | buf->| limit(c)-><0> | | */ size_t sz = buf - limit; limit = alloca(sz); limit[0] = 0; } } } #else # define reserve_stack(limit, size) ((void)(limit), (void)(size)) #endif #undef ruby_init_stack void ruby_init_stack(volatile VALUE *addr) { native_main_thread.id = pthread_self(); #if MAINSTACKADDR_AVAILABLE if (native_main_thread.stack_maxsize) return; { void* stackaddr; size_t size; if (get_main_stack(&stackaddr, &size) == 0) { native_main_thread.stack_maxsize = size; native_main_thread.stack_start = stackaddr; reserve_stack(stackaddr, size); goto bound_check; } } #endif #ifdef STACK_END_ADDRESS native_main_thread.stack_start = STACK_END_ADDRESS; #else if (!native_main_thread.stack_start || STACK_UPPER((VALUE *)(void *)&addr, native_main_thread.stack_start > addr, native_main_thread.stack_start < addr)) { native_main_thread.stack_start = (VALUE *)addr; } #endif { #if defined(HAVE_GETRLIMIT) #if defined(PTHREAD_STACK_DEFAULT) # if PTHREAD_STACK_DEFAULT < RUBY_STACK_SPACE*5 # error "PTHREAD_STACK_DEFAULT is too small" # endif size_t size = PTHREAD_STACK_DEFAULT; #else size_t size = RUBY_VM_THREAD_VM_STACK_SIZE; #endif size_t space; int pagesize = getpagesize(); struct rlimit rlim; STACK_GROW_DIR_DETECTION; if (getrlimit(RLIMIT_STACK, &rlim) == 0) { size = (size_t)rlim.rlim_cur; } addr = native_main_thread.stack_start; if (IS_STACK_DIR_UPPER()) { space = ((size_t)((char *)addr + size) / pagesize) * pagesize - (size_t)addr; } else { space = (size_t)addr - ((size_t)((char *)addr - size) / pagesize + 1) * pagesize; } native_main_thread.stack_maxsize = space; #endif } #if MAINSTACKADDR_AVAILABLE bound_check: #endif /* If addr is out of range of main-thread stack range estimation, */ /* it should be on co-routine (alternative stack). [Feature #2294] */ { void *start, *end; STACK_GROW_DIR_DETECTION; if (IS_STACK_DIR_UPPER()) { start = native_main_thread.stack_start; end = (char *)native_main_thread.stack_start + native_main_thread.stack_maxsize; } else { start = (char *)native_main_thread.stack_start - native_main_thread.stack_maxsize; end = native_main_thread.stack_start; } if ((void *)addr < start || (void *)addr > end) { /* out of range */ native_main_thread.stack_start = (VALUE *)addr; native_main_thread.stack_maxsize = 0; /* unknown */ } } } #define CHECK_ERR(expr) \ {int err = (expr); if (err) {rb_bug_errno(#expr, err);}} static int native_thread_init_stack(rb_thread_t *th) { rb_nativethread_id_t curr = pthread_self(); if (pthread_equal(curr, native_main_thread.id)) { th->ec->machine.stack_start = native_main_thread.stack_start; th->ec->machine.stack_maxsize = native_main_thread.stack_maxsize; } else { #ifdef STACKADDR_AVAILABLE if (th_has_dedicated_nt(th)) { void *start; size_t size; if (get_stack(&start, &size) == 0) { uintptr_t diff = (uintptr_t)start - (uintptr_t)&curr; th->ec->machine.stack_start = (VALUE *)&curr; th->ec->machine.stack_maxsize = size - diff; } } #else rb_raise(rb_eNotImpError, "ruby engine can initialize only in the main thread"); #endif } return 0; } struct nt_param { rb_vm_t *vm; struct rb_native_thread *nt; }; static void * nt_start(void *ptr); static int native_thread_create0(struct rb_native_thread *nt) { int err = 0; pthread_attr_t attr; const size_t stack_size = nt->vm->default_params.thread_machine_stack_size; const size_t space = space_size(stack_size); nt->machine_stack_maxsize = stack_size - space; #ifdef USE_SIGALTSTACK nt->altstack = rb_allocate_sigaltstack(); #endif CHECK_ERR(pthread_attr_init(&attr)); # ifdef PTHREAD_STACK_MIN RUBY_DEBUG_LOG("stack size: %lu", (unsigned long)stack_size); CHECK_ERR(pthread_attr_setstacksize(&attr, stack_size)); # endif # ifdef HAVE_PTHREAD_ATTR_SETINHERITSCHED CHECK_ERR(pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED)); # endif CHECK_ERR(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)); err = pthread_create(&nt->thread_id, &attr, nt_start, nt); RUBY_DEBUG_LOG("nt:%d err:%d", (int)nt->serial, err); CHECK_ERR(pthread_attr_destroy(&attr)); return err; } static void native_thread_setup(struct rb_native_thread *nt) { // init cond rb_native_cond_initialize(&nt->cond.readyq); if (&nt->cond.readyq != &nt->cond.intr) { rb_native_cond_initialize(&nt->cond.intr); } } static void native_thread_setup_on_thread(struct rb_native_thread *nt) { // init tid #ifdef RB_THREAD_T_HAS_NATIVE_ID nt->tid = get_native_thread_id(); #endif // init signal handler RB_ALTSTACK_INIT(nt->altstack, nt->altstack); } static struct rb_native_thread * native_thread_alloc(void) { struct rb_native_thread *nt = ZALLOC(struct rb_native_thread); native_thread_setup(nt); #if USE_MN_THREADS nt->nt_context = ruby_xmalloc(sizeof(struct coroutine_context)); #endif #if USE_RUBY_DEBUG_LOG static rb_atomic_t nt_serial = 2; nt->serial = RUBY_ATOMIC_FETCH_ADD(nt_serial, 1); #endif return nt; } static int native_thread_create_dedicated(rb_thread_t *th) { th->nt = native_thread_alloc(); th->nt->vm = th->vm; th->nt->running_thread = th; th->nt->dedicated = 1; // vm stack size_t vm_stack_word_size = th->vm->default_params.thread_vm_stack_size / sizeof(VALUE); void *vm_stack = ruby_xmalloc(vm_stack_word_size * sizeof(VALUE)); th->sched.malloc_stack = true; rb_ec_initialize_vm_stack(th->ec, vm_stack, vm_stack_word_size); th->sched.context_stack = vm_stack; // setup thread_sched_to_ready(TH_SCHED(th), th); return native_thread_create0(th->nt); } static void call_thread_start_func_2(rb_thread_t *th) { native_thread_init_stack(th); thread_start_func_2(th, th->ec->machine.stack_start); } static void * nt_start(void *ptr) { struct rb_native_thread *nt = (struct rb_native_thread *)ptr; rb_vm_t *vm = nt->vm; native_thread_setup_on_thread(nt); // init tid #ifdef RB_THREAD_T_HAS_NATIVE_ID nt->tid = get_native_thread_id(); #endif #if USE_RUBY_DEBUG_LOG && defined(RUBY_NT_SERIAL) ruby_nt_serial = nt->serial; #endif RUBY_DEBUG_LOG("nt:%u", nt->serial); if (!nt->dedicated) { coroutine_initialize_main(nt->nt_context); } while (1) { if (nt->dedicated) { // wait running turn rb_thread_t *th = nt->running_thread; struct rb_thread_sched *sched = TH_SCHED(th); RUBY_DEBUG_LOG("on dedicated th:%u", rb_th_serial(th)); ruby_thread_set_native(th); thread_sched_lock(sched, th); { if (sched->running == th) { thread_sched_add_running_thread(sched, th); } thread_sched_wait_running_turn(sched, th, false); } thread_sched_unlock(sched, th); // start threads call_thread_start_func_2(th); break; // TODO: allow to change to the SNT } else { RUBY_DEBUG_LOG("check next"); rb_ractor_t *r = ractor_sched_deq(vm, NULL); if (r) { struct rb_thread_sched *sched = &r->threads.sched; thread_sched_lock(sched, NULL); { rb_thread_t *next_th = sched->running; if (next_th && next_th->nt == NULL) { RUBY_DEBUG_LOG("nt:%d next_th:%d", (int)nt->serial, (int)next_th->serial); thread_sched_switch0(nt->nt_context, next_th, nt); } else { RUBY_DEBUG_LOG("no schedulable threads -- next_th:%p", next_th); } } thread_sched_unlock(sched, NULL); } else { // timeout -> deleted. break; } } } return NULL; } static int native_thread_create_shared(rb_thread_t *th); #if USE_MN_THREADS static void nt_free_stack(void *mstack); #endif void rb_threadptr_remove(rb_thread_t *th) { #if USE_MN_THREADS if (th->sched.malloc_stack) { // dedicated return; } else { rb_vm_t *vm = th->vm; th->sched.finished = false; RB_VM_LOCK_ENTER(); { ccan_list_add(&vm->ractor.sched.zombie_threads, &th->sched.node.zombie_threads); } RB_VM_LOCK_LEAVE(); } #endif } void rb_threadptr_sched_free(rb_thread_t *th) { #if USE_MN_THREADS if (th->sched.malloc_stack) { // has dedicated ruby_xfree(th->sched.context_stack); native_thread_destroy(th->nt); } else { nt_free_stack(th->sched.context_stack); // TODO: how to free nt and nt->altstack? } if (th->sched.context) { ruby_xfree(th->sched.context); VM_ASSERT((th->sched.context = NULL) == NULL); } #else ruby_xfree(th->sched.context_stack); native_thread_destroy(th->nt); #endif th->nt = NULL; } void rb_thread_sched_mark_zombies(rb_vm_t *vm) { if (!ccan_list_empty(&vm->ractor.sched.zombie_threads)) { rb_thread_t *zombie_th, *next_zombie_th; ccan_list_for_each_safe(&vm->ractor.sched.zombie_threads, zombie_th, next_zombie_th, sched.node.zombie_threads) { if (zombie_th->sched.finished) { ccan_list_del_init(&zombie_th->sched.node.zombie_threads); } else { rb_gc_mark(zombie_th->self); } } } } static int native_thread_create(rb_thread_t *th) { VM_ASSERT(th->nt == 0); RUBY_DEBUG_LOG("th:%d has_dnt:%d", th->serial, th->has_dedicated_nt); RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_STARTED, th); if (!th->ractor->threads.sched.enable_mn_threads) { th->has_dedicated_nt = 1; } if (th->has_dedicated_nt) { return native_thread_create_dedicated(th); } else { return native_thread_create_shared(th); } } #if USE_NATIVE_THREAD_PRIORITY static void native_thread_apply_priority(rb_thread_t *th) { #if defined(_POSIX_PRIORITY_SCHEDULING) && (_POSIX_PRIORITY_SCHEDULING > 0) struct sched_param sp; int policy; int priority = 0 - th->priority; int max, min; pthread_getschedparam(th->nt->thread_id, &policy, &sp); max = sched_get_priority_max(policy); min = sched_get_priority_min(policy); if (min > priority) { priority = min; } else if (max < priority) { priority = max; } sp.sched_priority = priority; pthread_setschedparam(th->nt->thread_id, policy, &sp); #else /* not touched */ #endif } #endif /* USE_NATIVE_THREAD_PRIORITY */ static int native_fd_select(int n, rb_fdset_t *readfds, rb_fdset_t *writefds, rb_fdset_t *exceptfds, struct timeval *timeout, rb_thread_t *th) { return rb_fd_select(n, readfds, writefds, exceptfds, timeout); } static void ubf_pthread_cond_signal(void *ptr) { rb_thread_t *th = (rb_thread_t *)ptr; RUBY_DEBUG_LOG("th:%u on nt:%d", rb_th_serial(th), (int)th->nt->serial); rb_native_cond_signal(&th->nt->cond.intr); } static void native_cond_sleep(rb_thread_t *th, rb_hrtime_t *rel) { rb_nativethread_lock_t *lock = &th->interrupt_lock; rb_nativethread_cond_t *cond = &th->nt->cond.intr; /* Solaris cond_timedwait() return EINVAL if an argument is greater than * current_time + 100,000,000. So cut up to 100,000,000. This is * considered as a kind of spurious wakeup. The caller to native_sleep * should care about spurious wakeup. * * See also [Bug #1341] [ruby-core:29702] * http://download.oracle.com/docs/cd/E19683-01/816-0216/6m6ngupgv/index.html */ const rb_hrtime_t max = (rb_hrtime_t)100000000 * RB_HRTIME_PER_SEC; THREAD_BLOCKING_BEGIN(th); { rb_native_mutex_lock(lock); th->unblock.func = ubf_pthread_cond_signal; th->unblock.arg = th; if (RUBY_VM_INTERRUPTED(th->ec)) { /* interrupted. return immediate */ RUBY_DEBUG_LOG("interrupted before sleep th:%u", rb_th_serial(th)); } else { if (!rel) { rb_native_cond_wait(cond, lock); } else { rb_hrtime_t end; if (*rel > max) { *rel = max; } end = native_cond_timeout(cond, *rel); native_cond_timedwait(cond, lock, &end); } } th->unblock.func = 0; rb_native_mutex_unlock(lock); } THREAD_BLOCKING_END(th); RUBY_DEBUG_LOG("done th:%u", rb_th_serial(th)); } #ifdef USE_UBF_LIST static CCAN_LIST_HEAD(ubf_list_head); static rb_nativethread_lock_t ubf_list_lock = RB_NATIVETHREAD_LOCK_INIT; static void ubf_list_atfork(void) { ccan_list_head_init(&ubf_list_head); rb_native_mutex_initialize(&ubf_list_lock); } RBIMPL_ATTR_MAYBE_UNUSED() static bool ubf_list_contain_p(rb_thread_t *th) { rb_thread_t *list_th; ccan_list_for_each(&ubf_list_head, list_th, sched.node.ubf) { if (list_th == th) return true; } return false; } /* The thread 'th' is registered to be trying unblock. */ static void register_ubf_list(rb_thread_t *th) { RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); struct ccan_list_node *node = &th->sched.node.ubf; VM_ASSERT(th->unblock.func != NULL); rb_native_mutex_lock(&ubf_list_lock); { // check not connected yet if (ccan_list_empty((struct ccan_list_head*)node)) { VM_ASSERT(!ubf_list_contain_p(th)); ccan_list_add(&ubf_list_head, node); } } rb_native_mutex_unlock(&ubf_list_lock); timer_thread_wakeup(); } /* The thread 'th' is unblocked. It no longer need to be registered. */ static void unregister_ubf_list(rb_thread_t *th) { RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); struct ccan_list_node *node = &th->sched.node.ubf; /* we can't allow re-entry into ubf_list_head */ VM_ASSERT(th->unblock.func == NULL); if (!ccan_list_empty((struct ccan_list_head*)node)) { rb_native_mutex_lock(&ubf_list_lock); { VM_ASSERT(ubf_list_contain_p(th)); ccan_list_del_init(node); } rb_native_mutex_unlock(&ubf_list_lock); } } /* * send a signal to intent that a target thread return from blocking syscall. * Maybe any signal is ok, but we chose SIGVTALRM. */ static void ubf_wakeup_thread(rb_thread_t *th) { RUBY_DEBUG_LOG("th:%u thread_id:%p", rb_th_serial(th), (void *)th->nt->thread_id); pthread_kill(th->nt->thread_id, SIGVTALRM); } static void ubf_select(void *ptr) { rb_thread_t *th = (rb_thread_t *)ptr; RUBY_DEBUG_LOG("wakeup th:%u", rb_th_serial(th)); ubf_wakeup_thread(th); register_ubf_list(th); } static bool ubf_threads_empty(void) { return ccan_list_empty(&ubf_list_head) != 0; } static void ubf_wakeup_all_threads(void) { if (!ubf_threads_empty()) { rb_thread_t *th; rb_native_mutex_lock(&ubf_list_lock); { ccan_list_for_each(&ubf_list_head, th, sched.node.ubf) { ubf_wakeup_thread(th); } } rb_native_mutex_unlock(&ubf_list_lock); } } #else /* USE_UBF_LIST */ #define register_ubf_list(th) (void)(th) #define unregister_ubf_list(th) (void)(th) #define ubf_select 0 static void ubf_wakeup_all_threads(void) { return; } static bool ubf_threads_empty(void) { return true; } #define ubf_list_atfork() do {} while (0) #endif /* USE_UBF_LIST */ #define TT_DEBUG 0 #define WRITE_CONST(fd, str) (void)(write((fd),(str),sizeof(str)-1)<0) void rb_thread_wakeup_timer_thread(int sig) { // This function can be called from signal handlers so that // pthread_mutex_lock() should not be used. // wakeup timer thread timer_thread_wakeup_force(); // interrupt main thread if main thread is available if (system_working) { rb_vm_t *vm = GET_VM(); rb_thread_t *main_th = vm->ractor.main_thread; if (main_th) { volatile rb_execution_context_t *main_th_ec = ACCESS_ONCE(rb_execution_context_t *, main_th->ec); if (main_th_ec) { RUBY_VM_SET_TRAP_INTERRUPT(main_th_ec); if (vm->ubf_async_safe && main_th->unblock.func) { (main_th->unblock.func)(main_th->unblock.arg); } } } } } #define CLOSE_INVALIDATE_PAIR(expr) \ close_invalidate_pair(expr,"close_invalidate: "#expr) static void close_invalidate(int *fdp, const char *msg) { int fd = *fdp; *fdp = -1; if (close(fd) < 0) { async_bug_fd(msg, errno, fd); } } static void close_invalidate_pair(int fds[2], const char *msg) { if (USE_EVENTFD && fds[0] == fds[1]) { fds[1] = -1; // disable write port first close_invalidate(&fds[0], msg); } else { close_invalidate(&fds[1], msg); close_invalidate(&fds[0], msg); } } static void set_nonblock(int fd) { int oflags; int err; oflags = fcntl(fd, F_GETFL); if (oflags == -1) rb_sys_fail(0); oflags |= O_NONBLOCK; err = fcntl(fd, F_SETFL, oflags); if (err == -1) rb_sys_fail(0); } /* communication pipe with timer thread and signal handler */ static void setup_communication_pipe_internal(int pipes[2]) { int err; if (pipes[0] > 0 || pipes[1] > 0) { VM_ASSERT(pipes[0] > 0); VM_ASSERT(pipes[1] > 0); return; } /* * Don't bother with eventfd on ancient Linux 2.6.22..2.6.26 which were * missing EFD_* flags, they can fall back to pipe */ #if USE_EVENTFD && defined(EFD_NONBLOCK) && defined(EFD_CLOEXEC) pipes[0] = pipes[1] = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC); if (pipes[0] >= 0) { rb_update_max_fd(pipes[0]); return; } #endif err = rb_cloexec_pipe(pipes); if (err != 0) { rb_bug("can not create communication pipe"); } rb_update_max_fd(pipes[0]); rb_update_max_fd(pipes[1]); set_nonblock(pipes[0]); set_nonblock(pipes[1]); } #if !defined(SET_CURRENT_THREAD_NAME) && defined(__linux__) && defined(PR_SET_NAME) # define SET_CURRENT_THREAD_NAME(name) prctl(PR_SET_NAME, name) #endif enum { THREAD_NAME_MAX = #if defined(__linux__) 16 #elif defined(__APPLE__) /* Undocumented, and main thread seems unlimited */ 64 #else 16 #endif }; static VALUE threadptr_invoke_proc_location(rb_thread_t *th); static void native_set_thread_name(rb_thread_t *th) { #ifdef SET_CURRENT_THREAD_NAME VALUE loc; if (!NIL_P(loc = th->name)) { SET_CURRENT_THREAD_NAME(RSTRING_PTR(loc)); } else if ((loc = threadptr_invoke_proc_location(th)) != Qnil) { char *name, *p; char buf[THREAD_NAME_MAX]; size_t len; int n; name = RSTRING_PTR(RARRAY_AREF(loc, 0)); p = strrchr(name, '/'); /* show only the basename of the path. */ if (p && p[1]) name = p + 1; n = snprintf(buf, sizeof(buf), "%s:%d", name, NUM2INT(RARRAY_AREF(loc, 1))); RB_GC_GUARD(loc); len = (size_t)n; if (len >= sizeof(buf)) { buf[sizeof(buf)-2] = '*'; buf[sizeof(buf)-1] = '\0'; } SET_CURRENT_THREAD_NAME(buf); } #endif } static void native_set_another_thread_name(rb_nativethread_id_t thread_id, VALUE name) { #if defined SET_ANOTHER_THREAD_NAME || defined SET_CURRENT_THREAD_NAME char buf[THREAD_NAME_MAX]; const char *s = ""; # if !defined SET_ANOTHER_THREAD_NAME if (!pthread_equal(pthread_self(), thread_id)) return; # endif if (!NIL_P(name)) { long n; RSTRING_GETMEM(name, s, n); if (n >= (int)sizeof(buf)) { memcpy(buf, s, sizeof(buf)-1); buf[sizeof(buf)-1] = '\0'; s = buf; } } # if defined SET_ANOTHER_THREAD_NAME SET_ANOTHER_THREAD_NAME(thread_id, s); # elif defined SET_CURRENT_THREAD_NAME SET_CURRENT_THREAD_NAME(s); # endif #endif } #if defined(RB_THREAD_T_HAS_NATIVE_ID) || defined(__APPLE__) static VALUE native_thread_native_thread_id(rb_thread_t *target_th) { if (!target_th->nt) return Qnil; #ifdef RB_THREAD_T_HAS_NATIVE_ID int tid = target_th->nt->tid; if (tid == 0) return Qnil; return INT2FIX(tid); #elif defined(__APPLE__) uint64_t tid; /* The first condition is needed because MAC_OS_X_VERSION_10_6 is not defined on 10.5, and while __POWERPC__ takes care of ppc/ppc64, i386 will be broken without this. Note, 10.5 is supported with GCC upstream, so it has C++17 and everything needed to build modern Ruby. */ # if (!defined(MAC_OS_X_VERSION_10_6) || \ (MAC_OS_X_VERSION_MAX_ALLOWED < MAC_OS_X_VERSION_10_6) || \ defined(__POWERPC__) /* never defined for PowerPC platforms */) const bool no_pthread_threadid_np = true; # define NO_PTHREAD_MACH_THREAD_NP 1 # elif MAC_OS_X_VERSION_MIN_REQUIRED >= MAC_OS_X_VERSION_10_6 const bool no_pthread_threadid_np = false; # else # if !(defined(__has_attribute) && __has_attribute(availability)) /* __API_AVAILABLE macro does nothing on gcc */ __attribute__((weak)) int pthread_threadid_np(pthread_t, uint64_t*); # endif /* Check weakly linked symbol */ const bool no_pthread_threadid_np = !&pthread_threadid_np; # endif if (no_pthread_threadid_np) { return ULL2NUM(pthread_mach_thread_np(pthread_self())); } # ifndef NO_PTHREAD_MACH_THREAD_NP int e = pthread_threadid_np(target_th->nt->thread_id, &tid); if (e != 0) rb_syserr_fail(e, "pthread_threadid_np"); return ULL2NUM((unsigned long long)tid); # endif #endif } # define USE_NATIVE_THREAD_NATIVE_THREAD_ID 1 #else # define USE_NATIVE_THREAD_NATIVE_THREAD_ID 0 #endif static struct { rb_serial_t created_fork_gen; pthread_t pthread_id; int comm_fds[2]; // r, w #if (HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H) && USE_MN_THREADS int event_fd; // kernel event queue fd (epoll/kqueue) #endif #if HAVE_SYS_EPOLL_H && USE_MN_THREADS #define EPOLL_EVENTS_MAX 0x10 struct epoll_event finished_events[EPOLL_EVENTS_MAX]; #elif HAVE_SYS_EVENT_H && USE_MN_THREADS #define KQUEUE_EVENTS_MAX 0x10 struct kevent finished_events[KQUEUE_EVENTS_MAX]; #endif // waiting threads list struct ccan_list_head waiting; // waiting threads in ractors pthread_mutex_t waiting_lock; } timer_th = { .created_fork_gen = 0, }; #define TIMER_THREAD_CREATED_P() (timer_th.created_fork_gen == current_fork_gen) static void timer_thread_check_timeslice(rb_vm_t *vm); static int timer_thread_set_timeout(rb_vm_t *vm); static void timer_thread_wakeup_thread(rb_thread_t *th); #include "thread_pthread_mn.c" static int timer_thread_set_timeout(rb_vm_t *vm) { #if 0 return 10; // ms #else int timeout = -1; ractor_sched_lock(vm, NULL); { if ( !ccan_list_empty(&vm->ractor.sched.timeslice_threads) // (1-1) Provide time slice for active NTs || !ubf_threads_empty() // (1-3) Periodic UBF || vm->ractor.sched.grq_cnt > 0 // (1-4) Lazy GRQ deq start ) { RUBY_DEBUG_LOG("timeslice:%d ubf:%d grq:%d", !ccan_list_empty(&vm->ractor.sched.timeslice_threads), !ubf_threads_empty(), (vm->ractor.sched.grq_cnt > 0)); timeout = 10; // ms vm->ractor.sched.timeslice_wait_inf = false; } else { vm->ractor.sched.timeslice_wait_inf = true; } } ractor_sched_unlock(vm, NULL); if (vm->ractor.sched.timeslice_wait_inf) { rb_native_mutex_lock(&timer_th.waiting_lock); { rb_thread_t *th = ccan_list_top(&timer_th.waiting, rb_thread_t, sched.waiting_reason.node); if (th && (th->sched.waiting_reason.flags & thread_sched_waiting_timeout)) { rb_hrtime_t now = rb_hrtime_now(); rb_hrtime_t hrrel = rb_hrtime_sub(th->sched.waiting_reason.data.timeout, now); RUBY_DEBUG_LOG("th:%u now:%lu rel:%lu", rb_th_serial(th), (unsigned long)now, (unsigned long)hrrel); // TODO: overflow? timeout = (int)((hrrel + RB_HRTIME_PER_MSEC - 1) / RB_HRTIME_PER_MSEC); // ms } } rb_native_mutex_unlock(&timer_th.waiting_lock); } RUBY_DEBUG_LOG("timeout:%d inf:%d", timeout, (int)vm->ractor.sched.timeslice_wait_inf); // fprintf(stderr, "timeout:%d\n", timeout); return timeout; #endif } static void timer_thread_check_signal(rb_vm_t *vm) { // ruby_sigchld_handler(vm); TODO int signum = rb_signal_buff_size(); if (UNLIKELY(signum > 0) && vm->ractor.main_thread) { RUBY_DEBUG_LOG("signum:%d", signum); threadptr_trap_interrupt(vm->ractor.main_thread); } } static bool timer_thread_check_exceed(rb_hrtime_t abs, rb_hrtime_t now) { if (abs < now) { return true; } else if (abs - now < RB_HRTIME_PER_MSEC) { return true; // too short time } else { return false; } } static rb_thread_t * timer_thread_deq_wakeup(rb_vm_t *vm, rb_hrtime_t now) { rb_thread_t *th = ccan_list_top(&timer_th.waiting, rb_thread_t, sched.waiting_reason.node); if (th != NULL && (th->sched.waiting_reason.flags & thread_sched_waiting_timeout) && timer_thread_check_exceed(th->sched.waiting_reason.data.timeout, now)) { RUBY_DEBUG_LOG("wakeup th:%u", rb_th_serial(th)); // delete from waiting list ccan_list_del_init(&th->sched.waiting_reason.node); // setup result th->sched.waiting_reason.flags = thread_sched_waiting_none; th->sched.waiting_reason.data.result = 0; return th; } return NULL; } static void timer_thread_wakeup_thread(rb_thread_t *th) { RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); struct rb_thread_sched *sched = TH_SCHED(th); thread_sched_lock(sched, th); { if (sched->running != th) { thread_sched_to_ready_common(sched, th, true, false); } else { // will be release the execution right } } thread_sched_unlock(sched, th); } static void timer_thread_check_timeout(rb_vm_t *vm) { rb_hrtime_t now = rb_hrtime_now(); rb_thread_t *th; rb_native_mutex_lock(&timer_th.waiting_lock); { while ((th = timer_thread_deq_wakeup(vm, now)) != NULL) { timer_thread_wakeup_thread(th); } } rb_native_mutex_unlock(&timer_th.waiting_lock); } static void timer_thread_check_timeslice(rb_vm_t *vm) { // TODO: check time rb_thread_t *th; ccan_list_for_each(&vm->ractor.sched.timeslice_threads, th, sched.node.timeslice_threads) { RUBY_DEBUG_LOG("timeslice th:%u", rb_th_serial(th)); RUBY_VM_SET_TIMER_INTERRUPT(th->ec); } } void rb_assert_sig(void) { sigset_t oldmask; pthread_sigmask(0, NULL, &oldmask); if (sigismember(&oldmask, SIGVTALRM)) { rb_bug("!!!"); } else { RUBY_DEBUG_LOG("ok"); } } static void * timer_thread_func(void *ptr) { rb_vm_t *vm = (rb_vm_t *)ptr; #if defined(RUBY_NT_SERIAL) ruby_nt_serial = (rb_atomic_t)-1; #endif RUBY_DEBUG_LOG("started%s", ""); while (system_working) { timer_thread_check_signal(vm); timer_thread_check_timeout(vm); ubf_wakeup_all_threads(); RUBY_DEBUG_LOG("system_working:%d", system_working); timer_thread_polling(vm); } RUBY_DEBUG_LOG("terminated"); return NULL; } /* only use signal-safe system calls here */ static void signal_communication_pipe(int fd) { #if USE_EVENTFD const uint64_t buff = 1; #else const char buff = '!'; #endif ssize_t result; /* already opened */ if (fd >= 0) { retry: if ((result = write(fd, &buff, sizeof(buff))) <= 0) { int e = errno; switch (e) { case EINTR: goto retry; case EAGAIN: #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN case EWOULDBLOCK: #endif break; default: async_bug_fd("rb_thread_wakeup_timer_thread: write", e, fd); } } if (TT_DEBUG) WRITE_CONST(2, "rb_thread_wakeup_timer_thread: write\n"); } else { // ignore wakeup } } static void timer_thread_wakeup_force(void) { // should not use RUBY_DEBUG_LOG() because it can be called within signal handlers. signal_communication_pipe(timer_th.comm_fds[1]); } static void timer_thread_wakeup_locked(rb_vm_t *vm) { // should be locked before. ASSERT_ractor_sched_locked(vm, NULL); if (timer_th.created_fork_gen == current_fork_gen) { if (vm->ractor.sched.timeslice_wait_inf) { RUBY_DEBUG_LOG("wakeup with fd:%d", timer_th.comm_fds[1]); timer_thread_wakeup_force(); } else { RUBY_DEBUG_LOG("will be wakeup..."); } } } static void timer_thread_wakeup(void) { rb_vm_t *vm = GET_VM(); ractor_sched_lock(vm, NULL); { timer_thread_wakeup_locked(vm); } ractor_sched_unlock(vm, NULL); } static void rb_thread_create_timer_thread(void) { rb_serial_t created_fork_gen = timer_th.created_fork_gen; RUBY_DEBUG_LOG("fork_gen create:%d current:%d", (int)created_fork_gen, (int)current_fork_gen); timer_th.created_fork_gen = current_fork_gen; if (created_fork_gen != current_fork_gen) { if (created_fork_gen != 0) { RUBY_DEBUG_LOG("forked child process"); CLOSE_INVALIDATE_PAIR(timer_th.comm_fds); #if HAVE_SYS_EPOLL_H && USE_MN_THREADS close_invalidate(&timer_th.event_fd, "close event_fd"); #endif rb_native_mutex_destroy(&timer_th.waiting_lock); } ccan_list_head_init(&timer_th.waiting); rb_native_mutex_initialize(&timer_th.waiting_lock); // open communication channel setup_communication_pipe_internal(timer_th.comm_fds); // open event fd timer_thread_setup_mn(); } pthread_create(&timer_th.pthread_id, NULL, timer_thread_func, GET_VM()); } static int native_stop_timer_thread(void) { int stopped; stopped = --system_working <= 0; if (stopped) { RUBY_DEBUG_LOG("wakeup send %d", timer_th.comm_fds[1]); timer_thread_wakeup_force(); RUBY_DEBUG_LOG("wakeup sent"); pthread_join(timer_th.pthread_id, NULL); } if (TT_DEBUG) fprintf(stderr, "stop timer thread\n"); return stopped; } static void native_reset_timer_thread(void) { // } #ifdef HAVE_SIGALTSTACK int ruby_stack_overflowed_p(const rb_thread_t *th, const void *addr) { void *base; size_t size; const size_t water_mark = 1024 * 1024; STACK_GROW_DIR_DETECTION; #ifdef STACKADDR_AVAILABLE if (get_stack(&base, &size) == 0) { # ifdef __APPLE__ if (pthread_equal(th->nt->thread_id, native_main_thread.id)) { struct rlimit rlim; if (getrlimit(RLIMIT_STACK, &rlim) == 0 && rlim.rlim_cur > size) { size = (size_t)rlim.rlim_cur; } } # endif base = (char *)base + STACK_DIR_UPPER(+size, -size); } else #endif if (th) { size = th->ec->machine.stack_maxsize; base = (char *)th->ec->machine.stack_start - STACK_DIR_UPPER(0, size); } else { return 0; } size /= RUBY_STACK_SPACE_RATIO; if (size > water_mark) size = water_mark; if (IS_STACK_DIR_UPPER()) { if (size > ~(size_t)base+1) size = ~(size_t)base+1; if (addr > base && addr <= (void *)((char *)base + size)) return 1; } else { if (size > (size_t)base) size = (size_t)base; if (addr > (void *)((char *)base - size) && addr <= base) return 1; } return 0; } #endif int rb_reserved_fd_p(int fd) { /* no false-positive if out-of-FD at startup */ if (fd < 0) return 0; if (fd == timer_th.comm_fds[0] || fd == timer_th.comm_fds[1] #if (HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H) && USE_MN_THREADS || fd == timer_th.event_fd #endif ) { goto check_fork_gen; } return 0; check_fork_gen: if (timer_th.created_fork_gen == current_fork_gen) { /* async-signal-safe */ return 1; } else { return 0; } } rb_nativethread_id_t rb_nativethread_self(void) { return pthread_self(); } #if defined(USE_POLL) && !defined(HAVE_PPOLL) /* TODO: don't ignore sigmask */ static int ruby_ppoll(struct pollfd *fds, nfds_t nfds, const struct timespec *ts, const sigset_t *sigmask) { int timeout_ms; if (ts) { int tmp, tmp2; if (ts->tv_sec > INT_MAX/1000) timeout_ms = INT_MAX; else { tmp = (int)(ts->tv_sec * 1000); /* round up 1ns to 1ms to avoid excessive wakeups for <1ms sleep */ tmp2 = (int)((ts->tv_nsec + 999999L) / (1000L * 1000L)); if (INT_MAX - tmp < tmp2) timeout_ms = INT_MAX; else timeout_ms = (int)(tmp + tmp2); } } else timeout_ms = -1; return poll(fds, nfds, timeout_ms); } # define ppoll(fds,nfds,ts,sigmask) ruby_ppoll((fds),(nfds),(ts),(sigmask)) #endif /* * Single CPU setups benefit from explicit sched_yield() before ppoll(), * since threads may be too starved to enter the GVL waitqueue for * us to detect contention. Instead, we want to kick other threads * so they can run and possibly prevent us from entering slow paths * in ppoll() or similar syscalls. * * Confirmed on FreeBSD 11.2 and Linux 4.19. * [ruby-core:90417] [Bug #15398] */ #define THREAD_BLOCKING_YIELD(th) do { \ const rb_thread_t *next_th; \ struct rb_thread_sched *sched = TH_SCHED(th); \ RB_VM_SAVE_MACHINE_CONTEXT(th); \ thread_sched_to_waiting(sched, (th)); \ next_th = sched->running; \ rb_native_mutex_unlock(&sched->lock_); \ native_thread_yield(); /* TODO: needed? */ \ if (!next_th && rb_ractor_living_thread_num(th->ractor) > 1) { \ native_thread_yield(); \ } static void native_sleep(rb_thread_t *th, rb_hrtime_t *rel) { struct rb_thread_sched *sched = TH_SCHED(th); RUBY_DEBUG_LOG("rel:%d", rel ? (int)*rel : 0); if (rel) { if (th_has_dedicated_nt(th)) { native_cond_sleep(th, rel); } else { thread_sched_wait_events(sched, th, -1, thread_sched_waiting_timeout, rel); } } else { thread_sched_to_waiting_until_wakeup(sched, th); } RUBY_DEBUG_LOG("wakeup"); } // thread internal event hooks (only for pthread) struct rb_internal_thread_event_hook { rb_internal_thread_event_callback callback; rb_event_flag_t event; void *user_data; struct rb_internal_thread_event_hook *next; }; static pthread_rwlock_t rb_internal_thread_event_hooks_rw_lock = PTHREAD_RWLOCK_INITIALIZER; rb_internal_thread_event_hook_t * rb_internal_thread_add_event_hook(rb_internal_thread_event_callback callback, rb_event_flag_t internal_event, void *user_data) { rb_internal_thread_event_hook_t *hook = ALLOC_N(rb_internal_thread_event_hook_t, 1); hook->callback = callback; hook->user_data = user_data; hook->event = internal_event; int r; if ((r = pthread_rwlock_wrlock(&rb_internal_thread_event_hooks_rw_lock))) { rb_bug_errno("pthread_rwlock_wrlock", r); } hook->next = rb_internal_thread_event_hooks; ATOMIC_PTR_EXCHANGE(rb_internal_thread_event_hooks, hook); if ((r = pthread_rwlock_unlock(&rb_internal_thread_event_hooks_rw_lock))) { rb_bug_errno("pthread_rwlock_unlock", r); } return hook; } bool rb_internal_thread_remove_event_hook(rb_internal_thread_event_hook_t * hook) { int r; if ((r = pthread_rwlock_wrlock(&rb_internal_thread_event_hooks_rw_lock))) { rb_bug_errno("pthread_rwlock_wrlock", r); } bool success = FALSE; if (rb_internal_thread_event_hooks == hook) { ATOMIC_PTR_EXCHANGE(rb_internal_thread_event_hooks, hook->next); success = TRUE; } else { rb_internal_thread_event_hook_t *h = rb_internal_thread_event_hooks; do { if (h->next == hook) { h->next = hook->next; success = TRUE; break; } } while ((h = h->next)); } if ((r = pthread_rwlock_unlock(&rb_internal_thread_event_hooks_rw_lock))) { rb_bug_errno("pthread_rwlock_unlock", r); } if (success) { ruby_xfree(hook); } return success; } static void rb_thread_execute_hooks(rb_event_flag_t event, rb_thread_t *th) { int r; if ((r = pthread_rwlock_rdlock(&rb_internal_thread_event_hooks_rw_lock))) { rb_bug_errno("pthread_rwlock_rdlock", r); } if (rb_internal_thread_event_hooks) { rb_internal_thread_event_hook_t *h = rb_internal_thread_event_hooks; do { if (h->event & event) { rb_internal_thread_event_data_t event_data = { .thread = th->self, }; (*h->callback)(event, &event_data, h->user_data); } } while((h = h->next)); } if ((r = pthread_rwlock_unlock(&rb_internal_thread_event_hooks_rw_lock))) { rb_bug_errno("pthread_rwlock_unlock", r); } } #endif /* THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION */