M:N thread scheduler for Ractors

This patch introduce M:N thread scheduler for Ractor system.

In general, M:N thread scheduler employs N native threads (OS threads)
to manage M user-level threads (Ruby threads in this case).
On the Ruby interpreter, 1 native thread is provided for 1 Ractor
and all Ruby threads are managed by the native thread.

From Ruby 1.9, the interpreter uses 1:1 thread scheduler which means
1 Ruby thread has 1 native thread. M:N scheduler change this strategy.

Because of compatibility issue (and stableness issue of the implementation)
main Ractor doesn't use M:N scheduler on default. On the other words,
threads on the main Ractor will be managed with 1:1 thread scheduler.

There are additional settings by environment variables:

`RUBY_MN_THREADS=1` enables M:N thread scheduler on the main ractor.
Note that non-main ractors use the M:N scheduler without this
configuration. With this configuration, single ractor applications
run threads on M:1 thread scheduler (green threads, user-level threads).

`RUBY_MAX_CPU=n` specifies maximum number of native threads for
M:N scheduler (default: 8).

This patch will be reverted soon if non-easy issues are found.

[Bug #19842]
This commit is contained in:
Koichi Sasada 2023-04-10 10:53:13 +09:00
Родитель 096ee0648e
Коммит be1bbd5b7d
27 изменённых файлов: 3580 добавлений и 1524 удалений

Просмотреть файл

@ -242,6 +242,20 @@ assert_equal 'true', %{
end
}
assert_equal 'true', %{
Thread.new{}.join
begin
Process.waitpid2 fork{
Thread.new{
sleep 0.1
}.join
}
true
rescue NotImplementedError
true
end
}
assert_equal 'ok', %{
open("zzz_t1.rb", "w") do |f|
f.puts <<-END

Просмотреть файл

@ -15323,6 +15323,7 @@ ruby.$(OBJEXT): $(top_srcdir)/internal/ruby_parser.h
ruby.$(OBJEXT): $(top_srcdir)/internal/serial.h
ruby.$(OBJEXT): $(top_srcdir)/internal/static_assert.h
ruby.$(OBJEXT): $(top_srcdir)/internal/string.h
ruby.$(OBJEXT): $(top_srcdir)/internal/thread.h
ruby.$(OBJEXT): $(top_srcdir)/internal/variable.h
ruby.$(OBJEXT): $(top_srcdir)/internal/vm.h
ruby.$(OBJEXT): $(top_srcdir)/internal/warnings.h
@ -17536,6 +17537,7 @@ thread.$(OBJEXT): $(top_srcdir)/internal/time.h
thread.$(OBJEXT): $(top_srcdir)/internal/variable.h
thread.$(OBJEXT): $(top_srcdir)/internal/vm.h
thread.$(OBJEXT): $(top_srcdir)/internal/warnings.h
thread.$(OBJEXT): {$(VPATH)}$(COROUTINE_H)
thread.$(OBJEXT): {$(VPATH)}assert.h
thread.$(OBJEXT): {$(VPATH)}atomic.h
thread.$(OBJEXT): {$(VPATH)}backward/2/assume.h
@ -17730,6 +17732,7 @@ thread.$(OBJEXT): {$(VPATH)}thread.h
thread.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).c
thread.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).h
thread.$(OBJEXT): {$(VPATH)}thread_native.h
thread.$(OBJEXT): {$(VPATH)}thread_pthread_mn.c
thread.$(OBJEXT): {$(VPATH)}thread_sync.c
thread.$(OBJEXT): {$(VPATH)}thread_sync.rbinc
thread.$(OBJEXT): {$(VPATH)}timev.h

Просмотреть файл

@ -1342,6 +1342,8 @@ AC_CHECK_HEADERS(syscall.h)
AC_CHECK_HEADERS(time.h)
AC_CHECK_HEADERS(ucontext.h)
AC_CHECK_HEADERS(utime.h)
AC_CHECK_HEADERS(sys/epoll.h)
AS_CASE("$target_cpu", [x64|x86_64|i[3-6]86*], [
AC_CHECK_HEADERS(x86intrin.h)
])

Просмотреть файл

@ -443,6 +443,10 @@ setup_debug_log(void)
(ruby_debug_log_mode & ruby_debug_log_memory) ? "[mem]" : "",
(ruby_debug_log_mode & ruby_debug_log_stderr) ? "[stderr]" : "",
(ruby_debug_log_mode & ruby_debug_log_file) ? "[file]" : "");
if (debug_log.output_file[0]) {
fprintf(stderr, "RUBY_DEBUG_LOG filename=%s\n", debug_log.output_file);
}
rb_nativethread_lock_initialize(&debug_log.lock);
setup_debug_log_filter();
@ -609,10 +613,11 @@ ruby_debug_log(const char *file, int line, const char *func_name, const char *fm
// ractor information
if (ruby_single_main_ractor == NULL) {
rb_ractor_t *cr = th ? th->ractor : NULL;
rb_vm_t *vm = GET_VM();
if (r && len < MAX_DEBUG_LOG_MESSAGE_LEN) {
r = snprintf(buff + len, MAX_DEBUG_LOG_MESSAGE_LEN - len, "\tr:#%d/%u",
cr ? (int)rb_ractor_id(cr) : -1, GET_VM()->ractor.cnt);
r = snprintf(buff + len, MAX_DEBUG_LOG_MESSAGE_LEN - len, "\tr:#%d/%u (%u)",
cr ? (int)rb_ractor_id(cr) : -1, vm->ractor.cnt, vm->ractor.sched.running_cnt);
if (r < 0) rb_bug("ruby_debug_log returns %d", r);
len += r;

4
dir.c
Просмотреть файл

@ -805,7 +805,7 @@ dir_read(VALUE dir)
struct dirent *dp;
GetDIR(dir, dirp);
errno = 0;
rb_errno_set(0);
if ((dp = READDIR(dirp->dir, dirp->enc)) != NULL) {
return rb_external_str_new_with_enc(dp->d_name, NAMLEN(dp), dirp->enc);
}
@ -1723,7 +1723,7 @@ nogvl_opendir_at(void *ptr)
/* fallthrough*/
case 0:
if (fd >= 0) close(fd);
errno = e;
rb_errno_set(e);
}
}
#else /* !USE_OPENDIR_AT */

18
eval.c
Просмотреть файл

@ -2110,3 +2110,21 @@ Init_eval(void)
id_signo = rb_intern_const("signo");
id_status = rb_intern_const("status");
}
int
rb_errno(void)
{
return *rb_orig_errno_ptr();
}
void
rb_errno_set(int e)
{
*rb_orig_errno_ptr() = e;
}
int *
rb_errno_ptr(void)
{
return rb_orig_errno_ptr();
}

Просмотреть файл

@ -270,6 +270,24 @@ RBIMPL_ATTR_FORMAT(RBIMPL_PRINTF_FORMAT, 3, 0)
*/
int ruby_vsnprintf(char *str, size_t n, char const *fmt, va_list ap);
// TODO: doc
#include <errno.h>
int rb_errno(void);
void rb_errno_set(int);
int *rb_errno_ptr(void);
static inline int *
rb_orig_errno_ptr(void)
{
return &errno;
}
#define rb_orig_errno errno
#undef errno
#define errno (*rb_errno_ptr())
/** @cond INTERNAL_MACRO */
#if RBIMPL_HAS_WARNING("-Wgnu-zero-variadic-macro-arguments")
# /* Skip it; clang -pedantic doesn't like the following */

Просмотреть файл

@ -50,6 +50,7 @@ void rb_mutex_allow_trap(VALUE self, int val);
VALUE rb_uninterruptible(VALUE (*b_proc)(VALUE), VALUE data);
VALUE rb_mutex_owned_p(VALUE self);
VALUE rb_exec_recursive_outer_mid(VALUE (*f)(VALUE g, VALUE h, int r), VALUE g, VALUE h, ID mid);
void ruby_mn_threads_params(void);
int rb_thread_wait_for_single_fd(int fd, int events, struct timeval * timeout);

Просмотреть файл

@ -685,10 +685,16 @@ rb_last_status_set(int status, rb_pid_t pid)
GET_THREAD()->last_status = rb_process_status_new(pid, status, 0);
}
static void
last_status_clear(rb_thread_t *th)
{
th->last_status = Qnil;
}
void
rb_last_status_clear(void)
{
GET_THREAD()->last_status = Qnil;
last_status_clear(GET_THREAD());
}
static rb_pid_t
@ -1654,24 +1660,11 @@ before_exec(void)
before_exec_async_signal_safe();
}
/* This function should be async-signal-safe. Actually it is. */
static void
after_exec_async_signal_safe(void)
{
}
static void
after_exec_non_async_signal_safe(void)
{
rb_thread_reset_timer_thread();
rb_thread_start_timer_thread();
}
static void
after_exec(void)
{
after_exec_async_signal_safe();
after_exec_non_async_signal_safe();
rb_thread_reset_timer_thread();
rb_thread_start_timer_thread();
}
#if defined HAVE_WORKING_FORK || defined HAVE_DAEMON
@ -1686,10 +1679,14 @@ after_fork_ruby(rb_pid_t pid)
{
rb_threadptr_pending_interrupt_clear(GET_THREAD());
if (pid == 0) {
// child
clear_pid_cache();
rb_thread_atfork();
}
after_exec();
else {
// parent
after_exec();
}
}
#endif
@ -4210,16 +4207,19 @@ rb_fork_ruby2(struct rb_process_status *status)
while (1) {
prefork();
disable_child_handler_before_fork(&old);
before_fork_ruby();
pid = rb_fork();
err = errno;
if (status) {
status->pid = pid;
status->error = err;
disable_child_handler_before_fork(&old);
{
pid = rb_fork();
err = errno;
if (status) {
status->pid = pid;
status->error = err;
}
}
after_fork_ruby(pid);
disable_child_handler_fork_parent(&old); /* yes, bad name */
after_fork_ruby(pid);
if (pid >= 0) { /* fork succeed */
return pid;
@ -4663,11 +4663,16 @@ static VALUE
do_spawn_process(VALUE arg)
{
struct spawn_args *argp = (struct spawn_args *)arg;
rb_execarg_parent_start1(argp->execarg);
return (VALUE)rb_spawn_process(DATA_PTR(argp->execarg),
argp->errmsg.ptr, argp->errmsg.buflen);
}
NOINLINE(static rb_pid_t
rb_execarg_spawn(VALUE execarg_obj, char *errmsg, size_t errmsg_buflen));
static rb_pid_t
rb_execarg_spawn(VALUE execarg_obj, char *errmsg, size_t errmsg_buflen)
{
@ -4676,8 +4681,10 @@ rb_execarg_spawn(VALUE execarg_obj, char *errmsg, size_t errmsg_buflen)
args.execarg = execarg_obj;
args.errmsg.ptr = errmsg;
args.errmsg.buflen = errmsg_buflen;
return (rb_pid_t)rb_ensure(do_spawn_process, (VALUE)&args,
execarg_parent_end, execarg_obj);
rb_pid_t r = (rb_pid_t)rb_ensure(do_spawn_process, (VALUE)&args,
execarg_parent_end, execarg_obj);
return r;
}
static rb_pid_t
@ -4820,13 +4827,14 @@ rb_spawn(int argc, const VALUE *argv)
static VALUE
rb_f_system(int argc, VALUE *argv, VALUE _)
{
rb_thread_t *th = GET_THREAD();
VALUE execarg_obj = rb_execarg_new(argc, argv, TRUE, TRUE);
struct rb_execarg *eargp = rb_execarg_get(execarg_obj);
struct rb_process_status status = {0};
eargp->status = &status;
rb_last_status_clear();
last_status_clear(th);
// This function can set the thread's last status.
// May be different from waitpid_state.pid on exec failure.
@ -4834,12 +4842,10 @@ rb_f_system(int argc, VALUE *argv, VALUE _)
if (pid > 0) {
VALUE status = rb_process_status_wait(pid, 0);
struct rb_process_status *data = rb_check_typeddata(status, &rb_process_status_type);
// Set the last status:
rb_obj_freeze(status);
GET_THREAD()->last_status = status;
th->last_status = status;
if (data->status == EXIT_SUCCESS) {
return Qtrue;

178
ractor.c
Просмотреть файл

@ -112,18 +112,16 @@ ractor_unlock_self(rb_ractor_t *cr, const char *file, int line)
#define RACTOR_LOCK_SELF(r) ractor_lock_self(r, __FILE__, __LINE__)
#define RACTOR_UNLOCK_SELF(r) ractor_unlock_self(r, __FILE__, __LINE__)
static void
ractor_cond_wait(rb_ractor_t *r)
void
rb_ractor_lock_self(rb_ractor_t *r)
{
#if RACTOR_CHECK_MODE > 0
VALUE locked_by = r->sync.locked_by;
r->sync.locked_by = Qnil;
#endif
rb_native_cond_wait(&r->sync.cond, &r->sync.lock);
RACTOR_LOCK_SELF(r);
}
#if RACTOR_CHECK_MODE > 0
r->sync.locked_by = locked_by;
#endif
void
rb_ractor_unlock_self(rb_ractor_t *r)
{
RACTOR_UNLOCK_SELF(r);
}
// Ractor status
@ -243,7 +241,9 @@ ractor_free(void *ptr)
rb_ractor_t *r = (rb_ractor_t *)ptr;
RUBY_DEBUG_LOG("free r:%d", rb_ractor_id(r));
rb_native_mutex_destroy(&r->sync.lock);
#ifdef RUBY_THREAD_WIN32_H
rb_native_cond_destroy(&r->sync.cond);
#endif
ractor_queue_free(&r->sync.recv_queue);
ractor_queue_free(&r->sync.takers_queue);
ractor_local_storage_free(r);
@ -531,6 +531,19 @@ ractor_sleeping_by(const rb_ractor_t *r, enum rb_ractor_wait_status wait_status)
return (r->sync.wait.status & wait_status) && r->sync.wait.wakeup_status == wakeup_none;
}
#ifdef RUBY_THREAD_PTHREAD_H
// thread_*.c
void rb_ractor_sched_wakeup(rb_ractor_t *r);
#else
static void
rb_ractor_sched_wakeup(rb_ractor_t *r)
{
rb_native_cond_broadcast(&r->sync.cond);
}
#endif
static bool
ractor_wakeup(rb_ractor_t *r, enum rb_ractor_wait_status wait_status, enum rb_ractor_wakeup_status wakeup_status)
{
@ -544,7 +557,7 @@ ractor_wakeup(rb_ractor_t *r, enum rb_ractor_wait_status wait_status, enum rb_ra
if (ractor_sleeping_by(r, wait_status)) {
r->sync.wait.wakeup_status = wakeup_status;
rb_native_cond_broadcast(&r->sync.cond);
rb_ractor_sched_wakeup(r);
return true;
}
else {
@ -552,22 +565,6 @@ ractor_wakeup(rb_ractor_t *r, enum rb_ractor_wait_status wait_status, enum rb_ra
}
}
static void *
ractor_sleep_wo_gvl(void *ptr)
{
rb_ractor_t *cr = ptr;
RACTOR_LOCK_SELF(cr);
{
VM_ASSERT(cr->sync.wait.status != wait_none);
if (cr->sync.wait.wakeup_status == wakeup_none) {
ractor_cond_wait(cr);
}
cr->sync.wait.status = wait_none;
}
RACTOR_UNLOCK_SELF(cr);
return NULL;
}
static void
ractor_sleep_interrupt(void *ptr)
{
@ -582,34 +579,11 @@ ractor_sleep_interrupt(void *ptr)
typedef void (*ractor_sleep_cleanup_function)(rb_ractor_t *cr, void *p);
static enum rb_ractor_wakeup_status
ractor_sleep_with_cleanup(rb_execution_context_t *ec, rb_ractor_t *cr, enum rb_ractor_wait_status wait_status,
ractor_sleep_cleanup_function cf_func, void *cf_data)
static void
ractor_check_ints(rb_execution_context_t *ec, rb_ractor_t *cr, ractor_sleep_cleanup_function cf_func, void *cf_data)
{
enum rb_ractor_wakeup_status wakeup_status;
VM_ASSERT(GET_RACTOR() == cr);
// TODO: multi-threads
VM_ASSERT(cr->sync.wait.status == wait_none);
VM_ASSERT(wait_status != wait_none);
cr->sync.wait.status = wait_status;
cr->sync.wait.wakeup_status = wakeup_none;
// fprintf(stderr, "%s r:%p status:%s, wakeup_status:%s\n", RUBY_FUNCTION_NAME_STRING, (void *)cr,
// wait_status_str(cr->sync.wait.status), wakeup_status_str(cr->sync.wait.wakeup_status));
RUBY_DEBUG_LOG("sleep by %s", wait_status_str(wait_status));
RACTOR_UNLOCK(cr);
{
rb_nogvl(ractor_sleep_wo_gvl, cr,
ractor_sleep_interrupt, cr,
RB_NOGVL_UBF_ASYNC_SAFE | RB_NOGVL_INTR_FAIL);
}
RACTOR_LOCK(cr);
// rb_nogvl() can be canceled by interrupts
if (cr->sync.wait.status != wait_none) {
enum rb_ractor_wait_status prev_wait_status = cr->sync.wait.status;
cr->sync.wait.status = wait_none;
cr->sync.wait.wakeup_status = wakeup_by_interrupt;
@ -632,8 +606,85 @@ ractor_sleep_with_cleanup(rb_execution_context_t *ec, rb_ractor_t *cr, enum rb_r
rb_thread_check_ints();
}
}
RACTOR_LOCK(cr); // reachable?
// reachable?
RACTOR_LOCK(cr);
cr->sync.wait.status = prev_wait_status;
}
}
#ifdef RUBY_THREAD_PTHREAD_H
void rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf);
#else
// win32
static void
ractor_cond_wait(rb_ractor_t *r)
{
#if RACTOR_CHECK_MODE > 0
VALUE locked_by = r->sync.locked_by;
r->sync.locked_by = Qnil;
#endif
rb_native_cond_wait(&r->sync.cond, &r->sync.lock);
#if RACTOR_CHECK_MODE > 0
r->sync.locked_by = locked_by;
#endif
}
static void *
ractor_sleep_wo_gvl(void *ptr)
{
rb_ractor_t *cr = ptr;
RACTOR_LOCK_SELF(cr);
{
VM_ASSERT(cr->sync.wait.status != wait_none);
if (cr->sync.wait.wakeup_status == wakeup_none) {
ractor_cond_wait(cr);
}
cr->sync.wait.status = wait_none;
}
RACTOR_UNLOCK_SELF(cr);
return NULL;
}
static void
rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf)
{
RACTOR_UNLOCK(cr);
{
rb_nogvl(ractor_sleep_wo_gvl, cr,
ubf, cr,
RB_NOGVL_UBF_ASYNC_SAFE | RB_NOGVL_INTR_FAIL);
}
RACTOR_LOCK(cr);
}
#endif
static enum rb_ractor_wakeup_status
ractor_sleep_with_cleanup(rb_execution_context_t *ec, rb_ractor_t *cr, enum rb_ractor_wait_status wait_status,
ractor_sleep_cleanup_function cf_func, void *cf_data)
{
enum rb_ractor_wakeup_status wakeup_status;
VM_ASSERT(GET_RACTOR() == cr);
// TODO: multi-threads
VM_ASSERT(cr->sync.wait.status == wait_none);
VM_ASSERT(wait_status != wait_none);
cr->sync.wait.status = wait_status;
cr->sync.wait.wakeup_status = wakeup_none;
// fprintf(stderr, "%s r:%p status:%s, wakeup_status:%s\n", RUBY_FUNCTION_NAME_STRING, (void *)cr,
// wait_status_str(cr->sync.wait.status), wakeup_status_str(cr->sync.wait.wakeup_status));
RUBY_DEBUG_LOG("sleep by %s", wait_status_str(wait_status));
while (cr->sync.wait.wakeup_status == wakeup_none) {
rb_ractor_sched_sleep(ec, cr, ractor_sleep_interrupt);
ractor_check_ints(ec, cr, cf_func, cf_data);
}
cr->sync.wait.status = wait_none;
// TODO: multi-thread
wakeup_status = cr->sync.wait.wakeup_status;
@ -1943,7 +1994,7 @@ rb_ractor_atfork(rb_vm_t *vm, rb_thread_t *th)
}
#endif
void rb_thread_sched_init(struct rb_thread_sched *);
void rb_thread_sched_init(struct rb_thread_sched *, bool atfork);
void
rb_ractor_living_threads_init(rb_ractor_t *r)
@ -1959,11 +2010,15 @@ ractor_init(rb_ractor_t *r, VALUE name, VALUE loc)
ractor_queue_setup(&r->sync.recv_queue);
ractor_queue_setup(&r->sync.takers_queue);
rb_native_mutex_initialize(&r->sync.lock);
rb_native_cond_initialize(&r->sync.cond);
rb_native_cond_initialize(&r->barrier_wait_cond);
#ifdef RUBY_THREAD_WIN32_H
rb_native_cond_initialize(&r->sync.cond);
rb_native_cond_initialize(&r->barrier_wait_cond);
#endif
// thread management
rb_thread_sched_init(&r->threads.sched);
rb_thread_sched_init(&r->threads.sched, false);
rb_ractor_living_threads_init(r);
// naming
@ -2218,6 +2273,8 @@ ractor_check_blocking(rb_ractor_t *cr, unsigned int remained_thread_cnt, const c
}
}
void rb_threadptr_remove(rb_thread_t *th);
void
rb_ractor_living_threads_remove(rb_ractor_t *cr, rb_thread_t *th)
{
@ -2225,6 +2282,8 @@ rb_ractor_living_threads_remove(rb_ractor_t *cr, rb_thread_t *th)
RUBY_DEBUG_LOG("r->threads.cnt:%d--", cr->threads.cnt);
ractor_check_blocking(cr, cr->threads.cnt - 1, __FILE__, __LINE__);
rb_threadptr_remove(th);
if (cr->threads.cnt == 1) {
vm_remove_ractor(th->vm, cr);
}
@ -2327,6 +2386,9 @@ ractor_terminal_interrupt_all(rb_vm_t *vm)
}
}
void rb_add_running_thread(rb_thread_t *th);
void rb_del_running_thread(rb_thread_t *th);
void
rb_ractor_terminate_all(void)
{
@ -2354,7 +2416,9 @@ rb_ractor_terminate_all(void)
// wait for 1sec
rb_vm_ractor_blocking_cnt_inc(vm, cr, __FILE__, __LINE__);
rb_del_running_thread(rb_ec_thread_ptr(cr->threads.running_ec));
rb_vm_cond_timedwait(vm, &vm->ractor.sync.terminate_cond, 1000 /* ms */);
rb_add_running_thread(rb_ec_thread_ptr(cr->threads.running_ec));
rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__);
ractor_terminal_interrupt_all(vm);

Просмотреть файл

@ -103,7 +103,6 @@ struct rb_ractor_sync {
#if RACTOR_CHECK_MODE > 0
VALUE locked_by;
#endif
rb_nativethread_cond_t cond;
bool incoming_port_closed;
bool outgoing_port_closed;
@ -120,7 +119,12 @@ struct rb_ractor_sync {
struct ractor_wait {
enum rb_ractor_wait_status status;
enum rb_ractor_wakeup_status wakeup_status;
rb_thread_t *waiting_thread;
} wait;
#ifndef RUBY_THREAD_PTHREAD_H
rb_nativethread_cond_t cond;
#endif
};
// created
@ -310,11 +314,13 @@ static inline void
rb_ractor_set_current_ec_(rb_ractor_t *cr, rb_execution_context_t *ec, const char *file, int line)
{
#ifdef RB_THREAD_LOCAL_SPECIFIER
# ifdef __APPLE__
rb_current_ec_set(ec);
# else
ruby_current_ec = ec;
# endif
#else
native_tls_set(ruby_current_ec_key, ec);
#endif

2
ruby.c
Просмотреть файл

@ -53,6 +53,7 @@
#include "internal/loadpath.h"
#include "internal/missing.h"
#include "internal/object.h"
#include "internal/thread.h"
#include "internal/ruby_parser.h"
#include "internal/variable.h"
#include "ruby/encoding.h"
@ -2148,6 +2149,7 @@ process_options(int argc, char **argv, ruby_cmdline_options_t *opt)
#endif
ruby_gc_set_params();
ruby_mn_threads_params();
ruby_init_loadpath();
Init_enc();

Просмотреть файл

@ -19,8 +19,15 @@ ruby_version_is "3.1" do
main_thread_id = Thread.current.native_thread_id
t_thread_id = t.native_thread_id
t_thread_id.should be_kind_of(Integer)
if ruby_version_is "3.3"
# native_thread_id can be nil on a M:N scheduler
t_thread_id.should be_kind_of(Integer) if t_thread_id != nil
else
t_thread_id.should be_kind_of(Integer)
end
main_thread_id.should_not == t_thread_id
t.run
t.join
t.native_thread_id.should == nil

Просмотреть файл

@ -12,3 +12,7 @@ if /freebsd13/ =~ RUBY_PLATFORM
# /usr/home/chkbuild/chkbuild/tmp/build/20220216T143001Z/ruby/test/ruby/test_thread.rb:1390:in `test_signal_at_join'
exclude(:test_signal_at_join, 'gets stuck somewhere')
end
if /mswin/ =~ RUBY_PLATFORM && ENV.key?('GITHUB_ACTIONS')
# to avoid "`failed to allocate memory (NoMemoryError)" error
exclude(:test_thread_interrupt_for_killed_thread, 'TODO')
end

Просмотреть файл

@ -1435,7 +1435,8 @@ q.pop
Thread.pass until th1.stop?
# After a thread starts (and execute `sleep`), it returns native_thread_id
assert_instance_of Integer, th1.native_thread_id
native_tid = th1.native_thread_id
assert_instance_of Integer, native_tid if native_tid # it can be nil
th1.wakeup
Thread.pass while th1.alive?

289
thread.c
Просмотреть файл

@ -147,7 +147,6 @@ static const char *thread_status_name(rb_thread_t *th, int detail);
static int hrtime_update_expire(rb_hrtime_t *, const rb_hrtime_t);
NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd));
static int consume_communication_pipe(int fd);
static int check_signals_nogvl(rb_thread_t *, int sigwait_fd);
static volatile int system_working = 1;
@ -260,12 +259,6 @@ timeout_prepare(rb_hrtime_t **to, rb_hrtime_t *rel, rb_hrtime_t *end,
MAYBE_UNUSED(NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start)));
static void
ubf_sigwait(void *ignore)
{
rb_thread_wakeup_timer_thread(0);
}
#include THREAD_IMPL_SRC
/*
@ -646,20 +639,13 @@ static int
thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
{
STACK_GROW_DIR_DETECTION;
RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
VM_ASSERT(th != th->vm->ractor.main_thread);
enum ruby_tag_type state;
VALUE errinfo = Qnil;
size_t size = th->vm->default_params.thread_vm_stack_size / sizeof(VALUE);
rb_thread_t *ractor_main_th = th->ractor->threads.main;
VALUE * vm_stack = NULL;
VM_ASSERT(th != th->vm->ractor.main_thread);
RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
// setup native thread
thread_sched_to_running(TH_SCHED(th), th);
ruby_thread_set_native(th);
RUBY_DEBUG_LOG("got lock. th:%u", rb_th_serial(th));
// setup ractor
if (rb_ractor_status_p(th->ractor, ractor_blocking)) {
@ -674,17 +660,6 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
RB_VM_UNLOCK();
}
// This assertion is not passed on win32 env. Check it later.
// VM_ASSERT((size * sizeof(VALUE)) <= th->ec->machine.stack_maxsize);
// setup VM and machine stack
vm_stack = alloca(size * sizeof(VALUE));
VM_ASSERT(vm_stack);
rb_ec_initialize_vm_stack(th->ec, vm_stack, size);
th->ec->machine.stack_start = STACK_DIR_UPPER(vm_stack + size, vm_stack);
th->ec->machine.stack_maxsize -= size * sizeof(VALUE);
// Ensure that we are not joinable.
VM_ASSERT(UNDEF_P(th->value));
@ -990,11 +965,11 @@ rb_thread_create(VALUE (*fn)(void *), void *arg)
}
VALUE
rb_thread_create_ractor(rb_ractor_t *g, VALUE args, VALUE proc)
rb_thread_create_ractor(rb_ractor_t *r, VALUE args, VALUE proc)
{
struct thread_create_params params = {
.type = thread_invoke_type_ractor_proc,
.g = g,
.g = r,
.args = args,
.proc = proc,
};
@ -1375,14 +1350,14 @@ sleep_forever(rb_thread_t *th, unsigned int fl)
void
rb_thread_sleep_forever(void)
{
RUBY_DEBUG_LOG("");
RUBY_DEBUG_LOG("forever");
sleep_forever(GET_THREAD(), SLEEP_SPURIOUS_CHECK);
}
void
rb_thread_sleep_deadly(void)
{
RUBY_DEBUG_LOG("");
RUBY_DEBUG_LOG("deadly");
sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE|SLEEP_SPURIOUS_CHECK);
}
@ -1394,7 +1369,7 @@ rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker, VALUE timeout, rb_hr
rb_fiber_scheduler_block(scheduler, blocker, timeout);
}
else {
RUBY_DEBUG_LOG("");
RUBY_DEBUG_LOG("...");
if (end) {
sleep_hrtime_until(GET_THREAD(), end, SLEEP_SPURIOUS_CHECK);
}
@ -1491,7 +1466,7 @@ blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region,
th->status = THREAD_STOPPED;
rb_ractor_blocking_threads_inc(th->ractor, __FILE__, __LINE__);
RUBY_DEBUG_LOG("");
RUBY_DEBUG_LOG("thread_id:%p", (void *)th->nt->thread_id);
RB_VM_SAVE_MACHINE_CONTEXT(th);
thread_sched_to_waiting(TH_SCHED(th), th);
@ -1519,8 +1494,12 @@ blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region)
th->status = region->prev_status;
}
RUBY_DEBUG_LOG("");
RUBY_DEBUG_LOG("end");
#ifndef _WIN32
// GET_THREAD() clears WSAGetLastError()
VM_ASSERT(th == GET_THREAD());
#endif
}
void *
@ -1544,14 +1523,11 @@ rb_nogvl(void *(*func)(void *), void *data1,
if (flags & RB_NOGVL_UBF_ASYNC_SAFE) {
vm->ubf_async_safe = 1;
}
else {
ubf_th = rb_thread_start_unblock_thread();
}
}
BLOCKING_REGION(th, {
val = func(data1);
saved_errno = errno;
saved_errno = rb_errno();
}, ubf, data2, flags & RB_NOGVL_INTR_FAIL);
if (is_main_thread) vm->ubf_async_safe = 0;
@ -1564,7 +1540,7 @@ rb_nogvl(void *(*func)(void *), void *data1,
thread_value(rb_thread_kill(ubf_th));
}
errno = saved_errno;
rb_errno_set(saved_errno);
return val;
}
@ -1689,11 +1665,31 @@ rb_thread_io_wake_pending_closer(struct waiting_fd *wfd)
}
}
VALUE
rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
static int
waitfd_to_waiting_flag(int wfd_event)
{
return wfd_event << 1;
}
VALUE
rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, int events)
{
volatile VALUE val = Qundef; /* shouldn't be used */
rb_execution_context_t * volatile ec = GET_EC();
rb_thread_t *th = rb_ec_thread_ptr(ec);
RUBY_DEBUG_LOG("th:%u fd:%d ev:%d", rb_th_serial(th), fd, events);
#ifdef RUBY_THREAD_PTHREAD_H
if (events && !th_has_dedicated_nt(th)) {
VM_ASSERT(events == RB_WAITFD_IN || events == RB_WAITFD_OUT);
// wait readable/writable
thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), NULL);
RUBY_VM_CHECK_INTS_BLOCKING(ec);
}
#endif
volatile VALUE val = Qundef; /* shouldn't be used */
volatile int saved_errno = 0;
enum ruby_tag_type state;
@ -1746,6 +1742,12 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
return val;
}
VALUE
rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
{
return rb_thread_io_blocking_call(func, data1, fd, 0);
}
/*
* rb_thread_call_with_gvl - re-enter the Ruby world after GVL release.
*
@ -2379,15 +2381,12 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
/* signal handling */
if (trap_interrupt && (th == th->vm->ractor.main_thread)) {
enum rb_thread_status prev_status = th->status;
int sigwait_fd = rb_sigwait_fd_get(th);
if (sigwait_fd >= 0) {
(void)consume_communication_pipe(sigwait_fd);
rb_sigwait_fd_put(th, sigwait_fd);
}
th->status = THREAD_RUNNABLE;
while ((sig = rb_get_next_signal()) != 0) {
ret |= rb_signal_exec(th, sig);
{
while ((sig = rb_get_next_signal()) != 0) {
ret |= rb_signal_exec(th, sig);
}
}
th->status = prev_status;
}
@ -2432,7 +2431,7 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
limits_us >>= -th->priority;
if (th->status == THREAD_RUNNABLE)
th->running_time_us += TIME_QUANTUM_USEC;
th->running_time_us += 10 * 1000; // 10ms = 10_000us // TODO: use macro
VM_ASSERT(th->ec->cfp);
EXEC_EVENT_HOOK(th->ec, RUBY_INTERNAL_EVENT_SWITCH, th->ec->cfp->self,
@ -3362,7 +3361,7 @@ rb_thread_setname(VALUE thread, VALUE name)
name = rb_str_new_frozen(name);
}
target_th->name = name;
if (threadptr_initialized(target_th)) {
if (threadptr_initialized(target_th) && target_th->has_dedicated_nt) {
native_set_another_thread_name(target_th->nt->thread_id, name);
}
return name;
@ -4148,7 +4147,6 @@ wait_retryable(int *result, int errnum, rb_hrtime_t *rel, rb_hrtime_t end)
struct select_set {
int max;
int sigwait_fd;
rb_thread_t *th;
rb_fdset_t *rset;
rb_fdset_t *wset;
@ -4164,10 +4162,6 @@ select_set_free(VALUE p)
{
struct select_set *set = (struct select_set *)p;
if (set->sigwait_fd >= 0) {
rb_sigwait_fd_put(set->th, set->sigwait_fd);
}
rb_fd_term(&set->orig_rset);
rb_fd_term(&set->orig_wset);
rb_fd_term(&set->orig_eset);
@ -4175,24 +4169,6 @@ select_set_free(VALUE p)
return Qfalse;
}
static const rb_hrtime_t *
sigwait_timeout(rb_thread_t *th, int sigwait_fd, const rb_hrtime_t *orig,
int *drained_p)
{
static const rb_hrtime_t quantum = TIME_QUANTUM_USEC * 1000;
if (sigwait_fd >= 0 && (!ubf_threads_empty() || BUSY_WAIT_SIGNALS)) {
*drained_p = check_signals_nogvl(th, sigwait_fd);
if (!orig || *orig > quantum)
return &quantum;
}
return orig;
}
#define sigwait_signals_fd(result, cond, sigwait_fd) \
(result > 0 && (cond) ? (result--, (sigwait_fd)) : -1)
static VALUE
do_select(VALUE p)
{
@ -4211,28 +4187,18 @@ do_select(VALUE p)
TRUE)
do {
int drained;
lerrno = 0;
BLOCKING_REGION(set->th, {
const rb_hrtime_t *sto;
struct timeval tv;
sto = sigwait_timeout(set->th, set->sigwait_fd, to, &drained);
if (!RUBY_VM_INTERRUPTED(set->th->ec)) {
result = native_fd_select(set->max, set->rset, set->wset,
set->eset,
rb_hrtime2timeval(&tv, sto), set->th);
result = native_fd_select(set->max,
set->rset, set->wset, set->eset,
rb_hrtime2timeval(&tv, to), set->th);
if (result < 0) lerrno = errno;
}
}, set->sigwait_fd >= 0 ? ubf_sigwait : ubf_select, set->th, TRUE);
if (set->sigwait_fd >= 0) {
int fd = sigwait_signals_fd(result,
rb_fd_isset(set->sigwait_fd, set->rset),
set->sigwait_fd);
(void)check_signals_nogvl(set->th, fd);
}
}, ubf_select, set->th, TRUE);
RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec); /* may raise */
} while (wait_retryable(&result, lerrno, to, end) && do_select_update());
@ -4244,18 +4210,6 @@ do_select(VALUE p)
return (VALUE)result;
}
static rb_fdset_t *
init_set_fd(int fd, rb_fdset_t *fds)
{
if (fd < 0) {
return 0;
}
rb_fd_init(fds);
rb_fd_set(fd, fds);
return fds;
}
int
rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except,
struct timeval *timeout)
@ -4279,16 +4233,6 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t *
return 0;
}
set.sigwait_fd = rb_sigwait_fd_get(set.th);
if (set.sigwait_fd >= 0) {
if (set.rset)
rb_fd_set(set.sigwait_fd, set.rset);
else
set.rset = init_set_fd(set.sigwait_fd, &set.orig_rset);
if (set.sigwait_fd >= set.max) {
set.max = set.sigwait_fd + 1;
}
}
#define fd_init_copy(f) do { \
if (set.f) { \
rb_fd_resize(set.max - 1, set.f); \
@ -4325,19 +4269,35 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t *
int
rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
{
struct pollfd fds[2];
struct pollfd fds[1];
int result = 0;
int drained;
nfds_t nfds;
rb_unblock_function_t *ubf;
struct waiting_fd wfd;
int state;
volatile int lerrno;
wfd.th = GET_THREAD();
rb_thread_t *th = wfd.th = GET_THREAD();
wfd.fd = fd;
wfd.busy = NULL;
#ifdef RUBY_THREAD_PTHREAD_H
if (!th->nt->dedicated) {
rb_hrtime_t rel, *prel;
if (timeout) {
rel = rb_timeval2hrtime(timeout);
prel = &rel;
}
else {
prel = NULL;
}
if (thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), prel)) {
return 0; // timeout
}
}
#endif
RB_VM_LOCK_ENTER();
{
ccan_list_add(&wfd.th->vm->waiting_fds, &wfd.wfd_node);
@ -4353,36 +4313,18 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
fds[0].events = (short)events;
fds[0].revents = 0;
do {
fds[1].fd = rb_sigwait_fd_get(wfd.th);
if (fds[1].fd >= 0) {
fds[1].events = POLLIN;
fds[1].revents = 0;
nfds = 2;
ubf = ubf_sigwait;
}
else {
nfds = 1;
ubf = ubf_select;
}
nfds = 1;
lerrno = 0;
BLOCKING_REGION(wfd.th, {
const rb_hrtime_t *sto;
struct timespec ts;
sto = sigwait_timeout(wfd.th, fds[1].fd, to, &drained);
if (!RUBY_VM_INTERRUPTED(wfd.th->ec)) {
result = ppoll(fds, nfds, rb_hrtime2timespec(&ts, sto), 0);
result = ppoll(fds, nfds, rb_hrtime2timespec(&ts, to), 0);
if (result < 0) lerrno = errno;
}
}, ubf, wfd.th, TRUE);
}, ubf_select, wfd.th, TRUE);
if (fds[1].fd >= 0) {
int fd1 = sigwait_signals_fd(result, fds[1].revents, fds[1].fd);
(void)check_signals_nogvl(wfd.th, fd1);
rb_sigwait_fd_put(wfd.th, fds[1].fd);
}
RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
} while (wait_retryable(&result, lerrno, to, end));
}
@ -4470,6 +4412,18 @@ select_single_cleanup(VALUE ptr)
return (VALUE)-1;
}
static rb_fdset_t *
init_set_fd(int fd, rb_fdset_t *fds)
{
if (fd < 0) {
return 0;
}
rb_fd_init(fds);
rb_fd_set(fd, fds);
return fds;
}
int
rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
{
@ -4552,16 +4506,13 @@ consume_communication_pipe(int fd)
ssize_t result;
int ret = FALSE; /* for rb_sigwait_sleep */
/*
* disarm UBF_TIMER before we read, because it can become
* re-armed at any time via sighandler and the pipe will refill
* We can disarm it because this thread is now processing signals
* and we do not want unnecessary SIGVTALRM
*/
ubf_timer_disarm();
while (1) {
result = read(fd, buff, sizeof(buff));
#if USE_EVENTFD
RUBY_DEBUG_LOG("resultf:%d buff:%lu", (int)result, (unsigned long)buff[0]);
#else
RUBY_DEBUG_LOG("result:%d", (int)result);
#endif
if (result > 0) {
ret = TRUE;
if (USE_EVENTFD || result < (ssize_t)sizeof(buff)) {
@ -4588,24 +4539,6 @@ consume_communication_pipe(int fd)
}
}
static int
check_signals_nogvl(rb_thread_t *th, int sigwait_fd)
{
rb_vm_t *vm = GET_VM(); /* th may be 0 */
int ret = sigwait_fd >= 0 ? consume_communication_pipe(sigwait_fd) : FALSE;
ubf_wakeup_all_threads();
if (rb_signal_buff_size()) {
if (th == vm->ractor.main_thread) {
/* no need to lock + wakeup if already in main thread */
RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
}
else {
threadptr_trap_interrupt(vm->ractor.main_thread);
}
}
return ret;
}
void
rb_thread_stop_timer_thread(void)
{
@ -4702,6 +4635,10 @@ rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const r
rb_ractor_sleeper_threads_clear(th->ractor);
rb_clear_coverages();
// restart timer thread (timer threads access to `vm->waitpid_lock` and so on.
rb_thread_reset_timer_thread();
rb_thread_start_timer_thread();
VM_ASSERT(vm->ractor.blocking_cnt == 0);
VM_ASSERT(vm->ractor.cnt == 1);
}
@ -5467,8 +5404,16 @@ Init_Thread(void)
/* main thread setting */
{
/* acquire global vm lock */
struct rb_thread_sched *sched = TH_SCHED(th);
thread_sched_to_running(sched, th);
#ifdef HAVE_PTHREAD_NP_H
VM_ASSERT(TH_SCHED(th)->running == th);
#endif
// thread_sched_to_running() should not be called because
// it assumes blocked by thread_sched_to_waiting().
// thread_sched_to_running(sched, th);
#ifdef RB_INTERNAL_THREAD_HOOK
RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_RESUMED);
#endif
th->pending_interrupt_queue = rb_ary_hidden_new(0);
th->pending_interrupt_queue_checked = 0;
@ -5481,7 +5426,7 @@ Init_Thread(void)
Init_thread_sync();
// TODO: Suppress unused function warning for now
if (0) rb_thread_sched_destroy(NULL);
// if (0) rb_thread_sched_destroy(NULL);
}
int
@ -5511,7 +5456,7 @@ debug_deadlock_check(rb_ractor_t *r, VALUE msg)
ccan_list_for_each(&r->threads.set, th, lt_node) {
rb_str_catf(msg, "* %+"PRIsVALUE"\n rb_thread_t:%p "
"native:%p int:%u",
th->self, (void *)th, thread_id_str(th), th->ec->interrupt_flag);
th->self, (void *)th, th->nt ? thread_id_str(th) : "N/A", th->ec->interrupt_flag);
if (th->locking_mutex) {
rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
@ -5537,14 +5482,18 @@ rb_check_deadlock(rb_ractor_t *r)
{
if (GET_THREAD()->vm->thread_ignore_deadlock) return;
int found = 0;
rb_thread_t *th = NULL;
#ifdef RUBY_THREAD_PTHREAD_H
if (r->threads.sched.readyq_cnt > 0) return;
#endif
int sleeper_num = rb_ractor_sleeper_thread_num(r);
int ltnum = rb_ractor_living_thread_num(r);
if (ltnum > sleeper_num) return;
if (ltnum < sleeper_num) rb_bug("sleeper must not be more than vm_living_thread_num(vm)");
if (patrol_thread && patrol_thread != GET_THREAD()) return;
int found = 0;
rb_thread_t *th = NULL;
ccan_list_for_each(&r->threads.set, th, lt_node) {
if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th->ec)) {

Просмотреть файл

@ -42,7 +42,7 @@ thread_sched_yield(struct rb_thread_sched *sched, rb_thread_t *th)
}
void
rb_thread_sched_init(struct rb_thread_sched *sched)
rb_thread_sched_init(struct rb_thread_sched *sched, bool atfork)
{
}
@ -134,6 +134,11 @@ Init_native_thread(rb_thread_t *main_th)
ruby_thread_set_native(main_th);
}
void
ruby_mn_threads_params(void)
{
}
static void
native_thread_destroy(rb_thread_t *th)
{
@ -276,9 +281,51 @@ native_fd_select(int n, rb_fdset_t *readfds, rb_fdset_t *writefds, rb_fdset_t *e
return rb_fd_select(n, readfds, writefds, exceptfds, timeout);
}
static VALUE
rb_thread_start_unblock_thread(void)
static bool
th_has_dedicated_nt(const rb_thread_t *th)
{
return Qfalse;
return true;
}
void
rb_add_running_thread(rb_thread_t *th){
// do nothing
}
void
rb_del_running_thread(rb_thread_t *th)
{
// do nothing
}
void
rb_threadptr_sched_free(rb_thread_t *th)
{
// do nothing
}
void
rb_ractor_sched_barrier_start(rb_vm_t *vm, rb_ractor_t *cr)
{
// do nothing
}
void
rb_ractor_sched_barrier_join(rb_vm_t *vm, rb_ractor_t *cr)
{
// do nothing
}
void
rb_threadptr_remove(rb_thread_t *th)
{
// do nothing
}
void
rb_thread_sched_mark_zombies(rb_vm_t *vm)
{
// do nothing
}
#endif /* THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION */

Просмотреть файл

@ -16,5 +16,6 @@ struct rb_thread_sched_item {};
struct rb_thread_sched {};
RUBY_EXTERN struct rb_execution_context_struct *ruby_current_ec;
NOINLINE(struct rb_execution_context_struct *rb_current_ec_noinline(void)); // for assertions
#endif /* RUBY_THREAD_NONE_H */

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -19,14 +19,59 @@
// per-Thead scheduler helper data
struct rb_thread_sched_item {
union {
struct {
struct ccan_list_node ubf;
struct ccan_list_node readyq; // protected by sched->lock
// connected to ractor->threads.sched.reqdyq
// locked by ractor->threads.sched.lock
struct ccan_list_node readyq;
// connected to vm->ractor.sched.timeslice_threads
// locked by vm->ractor.sched.lock
struct ccan_list_node timeslice_threads;
// connected to vm->ractor.sched.running_threads
// locked by vm->ractor.sched.lock
struct ccan_list_node running_threads;
// connected to vm->ractor.sched.zombie_threads
struct ccan_list_node zombie_threads;
} node;
// this data should be protected by timer_th.waiting_lock
struct {
enum thread_sched_waiting_flag {
thread_sched_waiting_none = 0x00,
thread_sched_waiting_timeout = 0x01,
thread_sched_waiting_io_read = 0x02,
thread_sched_waiting_io_write = 0x08,
thread_sched_waiting_io_force = 0x40, // ignore readable
} flags;
struct {
// should be compat with hrtime.h
#ifdef MY_RUBY_BUILD_MAY_TIME_TRAVEL
int128_t timeout;
#else
uint64_t timeout;
#endif
int fd; // -1 for timeout only
int result;
} data;
// connected to timer_th.waiting
struct ccan_list_node node;
} waiting_reason;
bool finished;
bool malloc_stack;
void *context_stack;
struct coroutine_context *context;
};
struct rb_native_thread {
rb_atomic_t serial;
struct rb_vm_struct *vm;
rb_nativethread_id_t thread_id;
@ -54,6 +99,11 @@ struct rb_native_thread {
#ifdef USE_SIGALTSTACK
void *altstack;
#endif
struct coroutine_context *nt_context;
int dedicated;
size_t machine_stack_maxsize;
};
#undef except
@ -63,45 +113,35 @@ struct rb_native_thread {
// per-Ractor
struct rb_thread_sched {
/* fast path */
rb_nativethread_lock_t lock_;
#if VM_CHECK_MODE
struct rb_thread_struct *lock_owner;
#endif
struct rb_thread_struct *running; // running thread or NULL
bool is_running;
bool is_running_timeslice;
bool enable_mn_threads;
const struct rb_thread_struct *running; // running thread or NULL
rb_nativethread_lock_t lock;
/*
* slow path, protected by ractor->thread_sched->lock
* - @readyq - FIFO queue of threads waiting for running
* - @timer - it handles timeslices for @current. It is any one thread
* in @waitq, there is no @timer if @waitq is empty, but always
* a @timer if @waitq has entries
* - @timer_err tracks timeslice limit, the timeslice only resets
* when pthread_cond_timedwait returns ETIMEDOUT, so frequent
* switching between contended/uncontended GVL won't reset the
* timer.
*/
struct ccan_list_head readyq;
const struct rb_thread_struct *timer;
int timer_err;
/* yield */
rb_nativethread_cond_t switch_cond;
rb_nativethread_cond_t switch_wait_cond;
int need_yield;
int wait_yield;
int readyq_cnt;
// ractor scheduling
struct ccan_list_node grq_node;
};
#ifdef RB_THREAD_LOCAL_SPECIFIER
# ifdef __APPLE__
// on Darwin, TLS can not be accessed across .so
struct rb_execution_context_struct *rb_current_ec(void);
void rb_current_ec_set(struct rb_execution_context_struct *);
# else
RUBY_EXTERN RB_THREAD_LOCAL_SPECIFIER struct rb_execution_context_struct *ruby_current_ec;
NOINLINE(void rb_current_ec_set(struct rb_execution_context_struct *));
NOINLINE(struct rb_execution_context_struct *rb_current_ec_noinline(void));
// for RUBY_DEBUG_LOG()
RUBY_EXTERN RB_THREAD_LOCAL_SPECIFIER rb_atomic_t ruby_nt_serial;
#define RUBY_NT_SERIAL 1
# endif
# ifdef __APPLE__
// on Darwin, TLS can not be accessed across .so
struct rb_execution_context_struct *rb_current_ec(void);
# else
RUBY_EXTERN RB_THREAD_LOCAL_SPECIFIER struct rb_execution_context_struct *ruby_current_ec;
// for RUBY_DEBUG_LOG()
RUBY_EXTERN RB_THREAD_LOCAL_SPECIFIER rb_atomic_t ruby_nt_serial;
#define RUBY_NT_SERIAL 1
# endif
#else
typedef pthread_key_t native_tls_key_t;

843
thread_pthread_mn.c Normal file
Просмотреть файл

@ -0,0 +1,843 @@
// included by "thread_pthread.c"
#if USE_MN_THREADS
static void timer_thread_unregister_waiting(rb_thread_t *th, int fd);
static bool
timer_thread_cancel_waiting(rb_thread_t *th)
{
bool canceled = false;
if (th->sched.waiting_reason.flags) {
rb_native_mutex_lock(&timer_th.waiting_lock);
{
if (th->sched.waiting_reason.flags) {
canceled = true;
ccan_list_del_init(&th->sched.waiting_reason.node);
if (th->sched.waiting_reason.flags & (thread_sched_waiting_io_read | thread_sched_waiting_io_write)) {
timer_thread_unregister_waiting(th, th->sched.waiting_reason.data.fd);
}
th->sched.waiting_reason.flags = thread_sched_waiting_none;
}
}
rb_native_mutex_unlock(&timer_th.waiting_lock);
}
return canceled;
}
static void
ubf_event_waiting(void *ptr)
{
rb_thread_t *th = (rb_thread_t *)ptr;
struct rb_thread_sched *sched = TH_SCHED(th);
RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
VM_ASSERT(th->nt == NULL || !th_has_dedicated_nt(th));
// only once. it is safe because th->interrupt_lock is already acquired.
th->unblock.func = NULL;
th->unblock.arg = NULL;
bool canceled = timer_thread_cancel_waiting(th);
thread_sched_lock(sched, th);
{
if (sched->running == th) {
RUBY_DEBUG_LOG("not waiting yet");
}
else if (canceled) {
thread_sched_to_ready_common(sched, th, true, false);
}
else {
RUBY_DEBUG_LOG("already not waiting");
}
}
thread_sched_unlock(sched, th);
}
static bool timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel);
// return true if timed out
static bool
thread_sched_wait_events(struct rb_thread_sched *sched, rb_thread_t *th, int fd, enum thread_sched_waiting_flag events, rb_hrtime_t *rel)
{
VM_ASSERT(!th_has_dedicated_nt(th)); // on SNT
volatile bool timedout = false, need_cancel = false;
if (timer_thread_register_waiting(th, fd, events, rel)) {
RUBY_DEBUG_LOG("wait fd:%d", fd);
RB_VM_SAVE_MACHINE_CONTEXT(th);
setup_ubf(th, ubf_event_waiting, (void *)th);
thread_sched_lock(sched, th);
{
if (th->sched.waiting_reason.flags == thread_sched_waiting_none) {
// already awaken
}
else if (RUBY_VM_INTERRUPTED(th->ec)) {
need_cancel = true;
}
else {
RUBY_DEBUG_LOG("sleep");
th->status = THREAD_STOPPED_FOREVER;
thread_sched_wakeup_next_thread(sched, th, true);
thread_sched_wait_running_turn(sched, th, true);
RUBY_DEBUG_LOG("wakeup");
}
timedout = th->sched.waiting_reason.data.result == 0;
}
thread_sched_unlock(sched, th);
if (need_cancel) {
timer_thread_cancel_waiting(th);
}
setup_ubf(th, NULL, NULL); // TODO: maybe it is already NULL?
th->status = THREAD_RUNNABLE;
}
else {
RUBY_DEBUG_LOG("can not wait fd:%d", fd);
return false;
}
VM_ASSERT(sched->running == th);
return timedout;
}
/// stack management
#define MSTACK_CHUNK_SIZE (512 * 1024 * 1024) // 512MB
#define MSTACK_PAGE_SIZE 4096
#define MSTACK_CHUNK_PAGE_NUM (MSTACK_CHUNK_SIZE / MSTACK_PAGE_SIZE - 1) // 1 is start redzone
// 512MB chunk
// 131,072 pages (> 65,536)
// 0th page is Redzone. Start from 1st page.
/*
* <--> machine stack + vm stack
* ----------------------------------
* |HD...|RZ| ... |RZ| ... ... |RZ|
* <------------- 512MB ------------->
*/
static struct nt_stack_chunk_header {
struct nt_stack_chunk_header *prev_chunk;
struct nt_stack_chunk_header *prev_free_chunk;
uint16_t start_page;
uint16_t stack_count;
uint16_t uninitialized_stack_count;
uint16_t free_stack_pos;
uint16_t free_stack[];
} *nt_stack_chunks = NULL,
*nt_free_stack_chunks = NULL;
struct nt_machine_stack_footer {
struct nt_stack_chunk_header *ch;
size_t index;
};
static rb_nativethread_lock_t nt_machine_stack_lock = RB_NATIVETHREAD_LOCK_INIT;
#include <sys/mman.h>
// vm_stack_size + machine_stack_size + 1 * (guard page size)
static inline size_t
nt_therad_stack_size(void)
{
static size_t msz;
if (LIKELY(msz > 0)) return msz;
rb_vm_t *vm = GET_VM();
int sz = (int)(vm->default_params.thread_vm_stack_size + vm->default_params.thread_machine_stack_size + MSTACK_PAGE_SIZE);
int page_num = (sz + MSTACK_PAGE_SIZE - 1) / MSTACK_PAGE_SIZE;
msz = page_num * MSTACK_PAGE_SIZE;
return msz;
}
static struct nt_stack_chunk_header *
nt_alloc_thread_stack_chunk(void)
{
const char *m = (void *)mmap(NULL, MSTACK_CHUNK_SIZE, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE | MAP_STACK, -1, 0);
if (m == MAP_FAILED) {
return NULL;
}
size_t msz = nt_therad_stack_size();
int header_page_cnt = 1;
int stack_count = ((MSTACK_CHUNK_PAGE_NUM - header_page_cnt) * MSTACK_PAGE_SIZE) / msz;
int ch_size = sizeof(struct nt_stack_chunk_header) + sizeof(uint16_t) * stack_count;
if (ch_size > MSTACK_PAGE_SIZE * header_page_cnt) {
header_page_cnt = (ch_size + MSTACK_PAGE_SIZE - 1) / MSTACK_PAGE_SIZE;
stack_count = ((MSTACK_CHUNK_PAGE_NUM - header_page_cnt) * MSTACK_PAGE_SIZE) / msz;
}
VM_ASSERT(stack_count <= UINT16_MAX);
struct nt_stack_chunk_header *ch = (struct nt_stack_chunk_header *)m;
ch->start_page = header_page_cnt;
ch->prev_chunk = nt_stack_chunks;
ch->prev_free_chunk = nt_free_stack_chunks;
ch->uninitialized_stack_count = ch->stack_count = (uint16_t)stack_count;
ch->free_stack_pos = 0;
RUBY_DEBUG_LOG("ch:%p start_page:%d stack_cnt:%d stack_size:%d", ch, (int)ch->start_page, (int)ch->stack_count, (int)msz);
return ch;
}
static void *
nt_stack_chunk_get_stack_start(struct nt_stack_chunk_header *ch, size_t idx)
{
const char *m = (char *)ch;
return (void *)(m + ch->start_page * MSTACK_PAGE_SIZE + idx * nt_therad_stack_size());
}
static struct nt_machine_stack_footer *
nt_stack_chunk_get_msf(const rb_vm_t *vm, const char *mstack)
{
// TODO: stack direction
const size_t msz = vm->default_params.thread_machine_stack_size;
return (struct nt_machine_stack_footer *)&mstack[msz - sizeof(struct nt_machine_stack_footer)];
}
static void *
nt_stack_chunk_get_stack(const rb_vm_t *vm, struct nt_stack_chunk_header *ch, size_t idx, void **vm_stack, void **machine_stack)
{
// TODO: only support stack going down
// [VM ... <GUARD> machine stack ...]
const char *vstack, *mstack;
const char *guard_page;
vstack = nt_stack_chunk_get_stack_start(ch, idx);
guard_page = vstack + vm->default_params.thread_vm_stack_size;
mstack = guard_page + MSTACK_PAGE_SIZE;
struct nt_machine_stack_footer *msf = nt_stack_chunk_get_msf(vm, mstack);
msf->ch = ch;
msf->index = idx;
#if 0
RUBY_DEBUG_LOG("msf:%p vstack:%p-%p guard_page:%p-%p mstack:%p-%p", msf,
vstack, (void *)(guard_page-1),
guard_page, (void *)(mstack-1),
mstack, (void *)(msf));
#endif
*vm_stack = (void *)vstack;
*machine_stack = (void *)mstack;
return (void *)guard_page;
}
RBIMPL_ATTR_MAYBE_UNUSED()
static void
nt_stack_chunk_dump(void)
{
struct nt_stack_chunk_header *ch;
int i;
fprintf(stderr, "** nt_stack_chunks\n");
ch = nt_stack_chunks;
for (i=0; ch; i++, ch = ch->prev_chunk) {
fprintf(stderr, "%d %p free_pos:%d\n", i, (void *)ch, (int)ch->free_stack_pos);
}
fprintf(stderr, "** nt_free_stack_chunks\n");
ch = nt_free_stack_chunks;
for (i=0; ch; i++, ch = ch->prev_free_chunk) {
fprintf(stderr, "%d %p free_pos:%d\n", i, (void *)ch, (int)ch->free_stack_pos);
}
}
static int
nt_guard_page(const char *p, size_t len)
{
if (mprotect((void *)p, len, PROT_NONE) != -1) {
return 0;
}
else {
return errno;
}
}
static int
nt_alloc_stack(rb_vm_t *vm, void **vm_stack, void **machine_stack)
{
int err = 0;
rb_native_mutex_lock(&nt_machine_stack_lock);
{
retry:
if (nt_free_stack_chunks) {
struct nt_stack_chunk_header *ch = nt_free_stack_chunks;
if (ch->free_stack_pos > 0) {
RUBY_DEBUG_LOG("free_stack_pos:%d", ch->free_stack_pos);
nt_stack_chunk_get_stack(vm, ch, ch->free_stack[--ch->free_stack_pos], vm_stack, machine_stack);
}
else if (ch->uninitialized_stack_count > 0) {
RUBY_DEBUG_LOG("uninitialized_stack_count:%d", ch->uninitialized_stack_count);
size_t idx = ch->stack_count - ch->uninitialized_stack_count--;
void *guard_page = nt_stack_chunk_get_stack(vm, ch, idx, vm_stack, machine_stack);
err = nt_guard_page(guard_page, MSTACK_PAGE_SIZE);
}
else {
nt_free_stack_chunks = ch->prev_free_chunk;
ch->prev_free_chunk = NULL;
goto retry;
}
}
else {
struct nt_stack_chunk_header *p = nt_alloc_thread_stack_chunk();
if (p == NULL) {
err = errno;
}
else {
nt_free_stack_chunks = nt_stack_chunks = p;
goto retry;
}
}
}
rb_native_mutex_unlock(&nt_machine_stack_lock);
return err;
}
static void
nt_free_stack(void *mstack)
{
if (!mstack) return;
rb_native_mutex_lock(&nt_machine_stack_lock);
{
struct nt_machine_stack_footer *msf = nt_stack_chunk_get_msf(GET_VM(), mstack);
struct nt_stack_chunk_header *ch = msf->ch;
int idx = (int)msf->index;
void *stack = nt_stack_chunk_get_stack_start(ch, idx);
RUBY_DEBUG_LOG("stack:%p mstack:%p ch:%p index:%d", stack, mstack, ch, idx);
if (ch->prev_free_chunk == NULL) {
ch->prev_free_chunk = nt_free_stack_chunks;
nt_free_stack_chunks = ch;
}
ch->free_stack[ch->free_stack_pos++] = idx;
// clear the stack pages
#if defined(MADV_FREE)
int r = madvise(stack, nt_therad_stack_size(), MADV_FREE);
#elif defined(MADV_DONTNEED)
int r = madvise(stack, nt_therad_stack_size(), MADV_DONTNEED);
#else
int r = 0;
#endif
if (r != 0) rb_bug("madvise errno:%d", errno);
}
rb_native_mutex_unlock(&nt_machine_stack_lock);
}
static int
native_thread_check_and_create_shared(rb_vm_t *vm)
{
bool need_to_make = false;
rb_native_mutex_lock(&vm->ractor.sched.lock);
{
unsigned int snt_cnt = vm->ractor.sched.snt_cnt;
if (!vm->ractor.main_ractor->threads.sched.enable_mn_threads) snt_cnt++; // do not need snt for main ractor
if (((int)snt_cnt < MINIMUM_SNT) ||
(snt_cnt < vm->ractor.cnt &&
snt_cnt < vm->ractor.sched.max_cpu)) {
RUBY_DEBUG_LOG("added snt:%u dnt:%u ractor_cnt:%u grq_cnt:%u",
vm->ractor.sched.snt_cnt,
vm->ractor.sched.dnt_cnt,
vm->ractor.cnt,
vm->ractor.sched.grq_cnt);
vm->ractor.sched.snt_cnt++;
need_to_make = true;
}
else {
RUBY_DEBUG_LOG("snt:%d ractor_cnt:%d", (int)vm->ractor.sched.snt_cnt, (int)vm->ractor.cnt);
}
}
rb_native_mutex_unlock(&vm->ractor.sched.lock);
if (need_to_make) {
struct rb_native_thread *nt = native_thread_alloc();
nt->vm = vm;
return native_thread_create0(nt);
}
else {
return 0;
}
}
static COROUTINE
co_start(struct coroutine_context *from, struct coroutine_context *self)
{
rb_thread_t *th = (rb_thread_t *)self->argument;
struct rb_thread_sched *sched = TH_SCHED(th);
VM_ASSERT(th->nt != NULL);
VM_ASSERT(th == sched->running);
VM_ASSERT(sched->lock_owner == NULL);
// RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
thread_sched_set_lock_owner(sched, th);
thread_sched_add_running_thread(TH_SCHED(th), th);
thread_sched_unlock(sched, th);
{
call_thread_start_func_2(th);
}
thread_sched_lock(sched, NULL);
RUBY_DEBUG_LOG("terminated th:%d", (int)th->serial);
// Thread is terminated
VM_ASSERT(!th_has_dedicated_nt(th));
rb_vm_t *vm = th->vm;
bool has_ready_ractor = vm->ractor.sched.grq_cnt > 0; // at least this ractor is not queued
rb_thread_t *next_th = sched->running;
struct rb_native_thread *nt = th->nt;
native_thread_assign(NULL, th);
rb_ractor_set_current_ec(th->ractor, NULL);
if (!has_ready_ractor && next_th && !next_th->nt) {
// switch to the next thread
thread_sched_set_lock_owner(sched, NULL);
thread_sched_switch0(th->sched.context, next_th, nt);
th->sched.finished = true;
}
else {
// switch to the next Ractor
th->sched.finished = true;
coroutine_transfer(self, nt->nt_context);
}
rb_bug("unreachable");
}
static int
native_thread_create_shared(rb_thread_t *th)
{
// setup coroutine
rb_vm_t *vm = th->vm;
void *vm_stack = NULL, *machine_stack = NULL;
int err = nt_alloc_stack(vm, &vm_stack, &machine_stack);
if (err) return err;
VM_ASSERT(vm_stack < machine_stack);
// setup vm stack
size_t vm_stack_words = th->vm->default_params.thread_vm_stack_size/sizeof(VALUE);
rb_ec_initialize_vm_stack(th->ec, vm_stack, vm_stack_words);
// setup machine stack
size_t machine_stack_size = vm->default_params.thread_machine_stack_size - sizeof(struct nt_machine_stack_footer);
th->ec->machine.stack_start = (void *)((uintptr_t)machine_stack + machine_stack_size);
th->ec->machine.stack_maxsize = machine_stack_size; // TODO
th->sched.context_stack = machine_stack;
th->sched.context = ruby_xmalloc(sizeof(struct coroutine_context));
coroutine_initialize(th->sched.context, co_start, machine_stack, machine_stack_size);
th->sched.context->argument = th;
RUBY_DEBUG_LOG("th:%u vm_stack:%p machine_stack:%p", rb_th_serial(th), vm_stack, machine_stack);
thread_sched_to_ready(TH_SCHED(th), th);
// setup nt
return native_thread_check_and_create_shared(th->vm);
}
#else // USE_MN_THREADS
static int
native_thread_create_shared(rb_thread_t *th)
{
rb_bug("unreachable");
}
static bool
thread_sched_wait_events(struct rb_thread_sched *sched, rb_thread_t *th, int fd, enum thread_sched_waiting_flag events, rb_hrtime_t *rel)
{
rb_bug("unreachable");
}
#endif // USE_MN_THREADS
/// EPOLL specific code
#if HAVE_SYS_EPOLL_H && USE_MN_THREADS
static bool
fd_readable_nonblock(int fd)
{
struct pollfd pfd = {
.fd = fd,
.events = POLLIN,
};
return poll(&pfd, 1, 0) != 0;
}
static bool
fd_writable_nonblock(int fd)
{
struct pollfd pfd = {
.fd = fd,
.events = POLLOUT,
};
return poll(&pfd, 1, 0) != 0;
}
static void
verify_waiting_list(void)
{
#if VM_CHECK_MODE > 0
rb_thread_t *wth, *prev_wth = NULL;
ccan_list_for_each(&timer_th.waiting, wth, sched.waiting_reason.node) {
// fprintf(stderr, "verify_waiting_list th:%u abs:%lu\n", rb_th_serial(wth), (unsigned long)wth->sched.waiting_reason.data.timeout);
if (prev_wth) {
rb_hrtime_t timeout = wth->sched.waiting_reason.data.timeout;
rb_hrtime_t prev_timeout = prev_wth->sched.waiting_reason.data.timeout;
VM_ASSERT(timeout == 0 || prev_timeout <= timeout);
}
prev_wth = wth;
}
#endif
}
// return false if the fd is not waitable or not need to wait.
static bool
timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel)
{
RUBY_DEBUG_LOG("th:%u fd:%d flag:%d rel:%lu", rb_th_serial(th), fd, flags, rel ? (unsigned long)*rel : 0);
VM_ASSERT(th == NULL || TH_SCHED(th)->running == th);
VM_ASSERT(flags != 0);
rb_hrtime_t abs = 0; // 0 means no timeout
if (rel) {
if (*rel > 0) {
flags |= thread_sched_waiting_timeout;
}
else {
return false;
}
}
if (rel && *rel > 0) {
flags |= thread_sched_waiting_timeout;
}
__uint32_t epoll_events = 0;
if (flags & thread_sched_waiting_timeout) {
VM_ASSERT(rel != NULL);
abs = rb_hrtime_add(rb_hrtime_now(), *rel);
}
if (flags & thread_sched_waiting_io_read) {
if (!(flags & thread_sched_waiting_io_force) && fd_readable_nonblock(fd)) {
RUBY_DEBUG_LOG("fd_readable_nonblock");
return false;
}
else {
VM_ASSERT(fd >= 0);
epoll_events |= EPOLLIN;
}
}
if (flags & thread_sched_waiting_io_write) {
if (!(flags & thread_sched_waiting_io_force) && fd_writable_nonblock(fd)) {
RUBY_DEBUG_LOG("fd_writable_nonblock");
return false;
}
else {
VM_ASSERT(fd >= 0);
epoll_events |= EPOLLOUT;
}
}
rb_native_mutex_lock(&timer_th.waiting_lock);
{
if (epoll_events) {
struct epoll_event event = {
.events = epoll_events,
.data = {
.ptr = (void *)th,
},
};
if (epoll_ctl(timer_th.epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
RUBY_DEBUG_LOG("failed (%d)", errno);
switch (errno) {
case EBADF:
// the fd is closed?
case EPERM:
// the fd doesn't support epoll
case EEXIST:
// the fd is already registerred by another thread
rb_native_mutex_unlock(&timer_th.waiting_lock);
return false;
default:
perror("epoll_ctl");
rb_bug("register/epoll_ctl failed(fd:%d, errno:%d)", fd, errno);
}
}
RUBY_DEBUG_LOG("epoll_ctl(add, fd:%d, events:%d) success", fd, epoll_events);
}
if (th) {
VM_ASSERT(th->sched.waiting_reason.flags == thread_sched_waiting_none);
// setup waiting information
{
th->sched.waiting_reason.flags = flags;
th->sched.waiting_reason.data.timeout = abs;
th->sched.waiting_reason.data.fd = fd;
th->sched.waiting_reason.data.result = 0;
}
if (abs == 0) { // no timeout
VM_ASSERT(!(flags & thread_sched_waiting_timeout));
ccan_list_add_tail(&timer_th.waiting, &th->sched.waiting_reason.node);
}
else {
RUBY_DEBUG_LOG("abs:%lu", abs);
VM_ASSERT(flags & thread_sched_waiting_timeout);
// insert th to sorted list (TODO: O(n))
rb_thread_t *wth, *prev_wth = NULL;
ccan_list_for_each(&timer_th.waiting, wth, sched.waiting_reason.node) {
if ((wth->sched.waiting_reason.flags & thread_sched_waiting_timeout) &&
wth->sched.waiting_reason.data.timeout < abs) {
prev_wth = wth;
}
else {
break;
}
}
if (prev_wth) {
ccan_list_add_after(&timer_th.waiting, &prev_wth->sched.waiting_reason.node, &th->sched.waiting_reason.node);
}
else {
ccan_list_add(&timer_th.waiting, &th->sched.waiting_reason.node);
}
verify_waiting_list();
// update timeout seconds
timer_thread_wakeup();
}
}
else {
VM_ASSERT(abs == 0);
}
}
rb_native_mutex_unlock(&timer_th.waiting_lock);
return true;
}
static void
timer_thread_unregister_waiting(rb_thread_t *th, int fd)
{
RUBY_DEBUG_LOG("th:%u fd:%d", rb_th_serial(th), fd);
// Linux 2.6.9 or later is needed to pass NULL as data.
if (epoll_ctl(timer_th.epoll_fd, EPOLL_CTL_DEL, fd, NULL) == -1) {
switch (errno) {
case EBADF:
// just ignore. maybe fd is closed.
break;
default:
perror("epoll_ctl");
rb_bug("unregister/epoll_ctl fails. errno:%d", errno);
}
}
}
static void
timer_thread_setup_nm(void)
{
if ((timer_th.epoll_fd = epoll_create1(EPOLL_CLOEXEC)) == -1) rb_bug("epoll_create (errno:%d)", errno);
RUBY_DEBUG_LOG("comm_fds:%d/%d", timer_th.comm_fds[0], timer_th.comm_fds[1]);
RUBY_DEBUG_LOG("epoll_fd:%d", timer_th.epoll_fd);
timer_thread_register_waiting(NULL, timer_th.comm_fds[0], thread_sched_waiting_io_read | thread_sched_waiting_io_force, NULL);
}
/*
* The purpose of the timer thread:
*
* (1) Periodic checking
* (1-1) Provide time slice for active NTs
* (1-2) Check NT shortage
* (1-3) Periodic UBF (global)
* (1-4) Lazy GRQ deq start
* (2) Receive notification
* (2-1) async I/O termination
* (2-2) timeout
* (2-2-1) sleep(n)
* (2-2-2) timeout(n), I/O, ...
*/
static void
timer_thread_polling(rb_vm_t *vm)
{
int r = epoll_wait(timer_th.epoll_fd, timer_th.finished_events, EPOLL_EVENTS_MAX, timer_thread_set_timeout(vm));
RUBY_DEBUG_LOG("r:%d errno:%d", r, errno);
switch (r) {
case 0: // timeout
RUBY_DEBUG_LOG("timeout%s", "");
ractor_sched_lock(vm, NULL);
{
// (1-1) timeslice
timer_thread_check_timeslice(vm);
// (1-4) lazy grq deq
if (vm->ractor.sched.grq_cnt > 0) {
RUBY_DEBUG_LOG("GRQ cnt: %u", vm->ractor.sched.grq_cnt);
rb_native_cond_signal(&vm->ractor.sched.cond);
}
}
ractor_sched_unlock(vm, NULL);
// (1-2)
native_thread_check_and_create_shared(vm);
break;
case -1:
switch (errno) {
case EINTR:
// simply retry
break;
default:
perror("epoll_wait");
rb_bug("epoll_wait errno:%d", errno);
}
break;
default:
RUBY_DEBUG_LOG("%d event(s)", r);
for (int i=0; i<r; i++) {
rb_thread_t *th = (rb_thread_t *)timer_th.finished_events[i].data.ptr;
if (th == NULL) {
// wakeup timerthread
RUBY_DEBUG_LOG("comm from fd:%d", timer_th.comm_fds[1]);
consume_communication_pipe(timer_th.comm_fds[0]);
}
else {
// wakeup specific thread by IO
uint32_t events = timer_th.finished_events[i].events;
RUBY_DEBUG_LOG("io event. wakeup_th:%u event:%s%s%s%s%s%s",
rb_th_serial(th),
(events & EPOLLIN) ? "in/" : "",
(events & EPOLLOUT) ? "out/" : "",
(events & EPOLLRDHUP) ? "RDHUP/" : "",
(events & EPOLLPRI) ? "pri/" : "",
(events & EPOLLERR) ? "err/" : "",
(events & EPOLLHUP) ? "hup/" : "");
rb_native_mutex_lock(&timer_th.waiting_lock);
{
if (th->sched.waiting_reason.flags) {
// delete from chain
ccan_list_del_init(&th->sched.waiting_reason.node);
timer_thread_unregister_waiting(th, th->sched.waiting_reason.data.fd);
th->sched.waiting_reason.flags = thread_sched_waiting_none;
th->sched.waiting_reason.data.fd = -1;
th->sched.waiting_reason.data.result = (int)events;
timer_thread_wakeup_thread(th);
}
else {
// already released
}
}
rb_native_mutex_unlock(&timer_th.waiting_lock);
}
}
}
}
#else // HAVE_SYS_EPOLL_H
static void
timer_thread_setup_nm(void)
{
// do nothing
}
static void
timer_thread_polling(rb_vm_t *vm)
{
int timeout = timer_thread_set_timeout(vm);
struct pollfd pfd = {
.fd = timer_th.comm_fds[0],
.events = POLLIN,
};
int r = poll(&pfd, 1, timeout);
switch (r) {
case 0: // timeout
rb_native_mutex_lock(&vm->ractor.sched.lock);
{
// (1-1) timeslice
timer_thread_check_timeslice(vm);
}
rb_native_mutex_unlock(&vm->ractor.sched.lock);
break;
case -1: // error
switch (errno) {
case EINTR:
// simply retry
break;
default:
perror("poll");
rb_bug("poll errno:%d", errno);
break;
}
case 1:
consume_communication_pipe(timer_th.comm_fds[0]);
break;
default:
rb_bug("unreachbale");
}
}
#endif // HAVE_SYS_EPOLL_H

Просмотреть файл

@ -41,6 +41,8 @@ struct queue_sleep_arg {
static void
sync_wakeup(struct ccan_list_head *head, long max)
{
RUBY_DEBUG_LOG("max:%ld", max);
struct sync_waiter *cur = 0, *next;
ccan_list_for_each_safe(head, cur, next, node) {
@ -51,6 +53,7 @@ sync_wakeup(struct ccan_list_head *head, long max)
rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
}
else {
RUBY_DEBUG_LOG("target_th:%u", rb_th_serial(cur->th));
rb_threadptr_interrupt(cur->th);
cur->th->status = THREAD_RUNNABLE;
}
@ -251,6 +254,8 @@ rb_mutex_trylock(VALUE self)
rb_mutex_t *mutex = mutex_ptr(self);
if (mutex->fiber == 0) {
RUBY_DEBUG_LOG("%p ok", mutex);
rb_fiber_t *fiber = GET_EC()->fiber_ptr;
rb_thread_t *th = GET_THREAD();
mutex->fiber = fiber;
@ -258,17 +263,12 @@ rb_mutex_trylock(VALUE self)
mutex_locked(th, self);
return Qtrue;
}
return Qfalse;
else {
RUBY_DEBUG_LOG("%p ng", mutex);
return Qfalse;
}
}
/*
* At maximum, only one thread can use cond_timedwait and watch deadlock
* periodically. Multiple polling thread (i.e. concurrent deadlock check)
* introduces new race conditions. [Bug #6278] [ruby-core:44275]
*/
static const rb_thread_t *patrol_thread = NULL;
static VALUE
mutex_owned_p(rb_fiber_t *fiber, rb_mutex_t *mutex)
{
@ -290,6 +290,8 @@ delete_from_waitq(VALUE value)
return Qnil;
}
static inline rb_atomic_t threadptr_get_interrupts(rb_thread_t *th);
static VALUE
do_mutex_lock(VALUE self, int interruptible_p)
{
@ -297,6 +299,7 @@ do_mutex_lock(VALUE self, int interruptible_p)
rb_thread_t *th = ec->thread_ptr;
rb_fiber_t *fiber = ec->fiber_ptr;
rb_mutex_t *mutex = mutex_ptr(self);
rb_atomic_t saved_ints = 0;
/* When running trap handler */
if (!FL_TEST_RAW(self, MUTEX_ALLOW_TRAP) &&
@ -310,6 +313,8 @@ do_mutex_lock(VALUE self, int interruptible_p)
}
while (mutex->fiber != fiber) {
VM_ASSERT(mutex->fiber != NULL);
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
struct sync_waiter sync_waiter = {
@ -331,51 +336,47 @@ do_mutex_lock(VALUE self, int interruptible_p)
rb_raise(rb_eThreadError, "deadlock; lock already owned by another fiber belonging to the same thread");
}
enum rb_thread_status prev_status = th->status;
rb_hrtime_t *timeout = 0;
rb_hrtime_t rel = rb_msec2hrtime(100);
th->status = THREAD_STOPPED_FOREVER;
th->locking_mutex = self;
rb_ractor_sleeper_threads_inc(th->ractor);
/*
* Carefully! while some contended threads are in native_sleep(),
* ractor->sleeper is unstable value. we have to avoid both deadlock
* and busy loop.
*/
if ((rb_ractor_living_thread_num(th->ractor) == rb_ractor_sleeper_thread_num(th->ractor)) &&
!patrol_thread) {
timeout = &rel;
patrol_thread = th;
}
struct sync_waiter sync_waiter = {
.self = self,
.th = th,
.fiber = nonblocking_fiber(fiber)
.fiber = nonblocking_fiber(fiber),
};
RUBY_DEBUG_LOG("%p wait", mutex);
// similar code with `sleep_forever`, but
// sleep_forever(SLEEP_DEADLOCKABLE) raises an exception.
// Ensure clause is needed like but `rb_ensure` a bit slow.
//
// begin
// sleep_forever(th, SLEEP_DEADLOCKABLE);
// ensure
// ccan_list_del(&sync_waiter.node);
// end
enum rb_thread_status prev_status = th->status;
th->status = THREAD_STOPPED_FOREVER;
rb_ractor_sleeper_threads_inc(th->ractor);
rb_check_deadlock(th->ractor);
th->locking_mutex = self;
ccan_list_add_tail(&mutex->waitq, &sync_waiter.node);
native_sleep(th, timeout); /* release GVL */
{
native_sleep(th, NULL);
}
ccan_list_del(&sync_waiter.node);
// unlocked by another thread while sleeping
if (!mutex->fiber) {
mutex->fiber = fiber;
}
if (patrol_thread == th)
patrol_thread = NULL;
th->locking_mutex = Qfalse;
if (mutex->fiber && timeout && !RUBY_VM_INTERRUPTED(th->ec)) {
rb_check_deadlock(th->ractor);
}
if (th->status == THREAD_STOPPED_FOREVER) {
th->status = prev_status;
}
rb_ractor_sleeper_threads_dec(th->ractor);
th->status = prev_status;
th->locking_mutex = Qfalse;
th->locking_mutex = Qfalse;
RUBY_DEBUG_LOG("%p wakeup", mutex);
}
if (interruptible_p) {
@ -387,11 +388,27 @@ do_mutex_lock(VALUE self, int interruptible_p)
mutex->fiber = fiber;
}
}
else {
// clear interrupt information
if (RUBY_VM_INTERRUPTED(th->ec)) {
// reset interrupts
if (saved_ints == 0) {
saved_ints = threadptr_get_interrupts(th);
}
else {
// ignore additional interrupts
threadptr_get_interrupts(th);
}
}
}
}
if (saved_ints) th->ec->interrupt_flag = saved_ints;
if (mutex->fiber == fiber) mutex_locked(th, self);
}
RUBY_DEBUG_LOG("%p locked", mutex);
// assertion
if (mutex_owned_p(fiber, mutex) == Qfalse) rb_bug("do_mutex_lock: mutex is not owned.");
@ -435,6 +452,8 @@ rb_mutex_owned_p(VALUE self)
static const char *
rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber)
{
RUBY_DEBUG_LOG("%p", mutex);
if (mutex->fiber == 0) {
return "Attempt to unlock a mutex which is not locked";
}
@ -456,13 +475,14 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber)
}
else {
switch (cur->th->status) {
case THREAD_RUNNABLE: /* from someone else calling Thread#run */
case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */
case THREAD_RUNNABLE: /* from someone else calling Thread#run */
case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */
RUBY_DEBUG_LOG("wakeup th:%u", rb_th_serial(cur->th));
rb_threadptr_interrupt(cur->th);
return NULL;
case THREAD_STOPPED: /* probably impossible */
case THREAD_STOPPED: /* probably impossible */
rb_bug("unexpected THREAD_STOPPED");
case THREAD_KILLED:
case THREAD_KILLED:
/* not sure about this, possible in exit GC? */
rb_bug("unexpected THREAD_KILLED");
continue;

Просмотреть файл

@ -148,12 +148,13 @@ thread_sched_yield(struct rb_thread_sched *sched, rb_thread_t *th)
}
void
rb_thread_sched_init(struct rb_thread_sched *sched)
rb_thread_sched_init(struct rb_thread_sched *sched, bool atfork)
{
if (GVL_DEBUG) fprintf(stderr, "sched init\n");
sched->lock = w32_mutex_create();
}
// per-ractor
void
rb_thread_sched_destroy(struct rb_thread_sched *sched)
{
@ -202,6 +203,11 @@ Init_native_thread(rb_thread_t *main_th)
main_th->nt->interrupt_event);
}
void
ruby_mn_threads_params(void)
{
}
static int
w32_wait_events(HANDLE *events, int count, DWORD timeout, rb_thread_t *th)
{
@ -637,20 +643,32 @@ thread_start_func_1(void *th_ptr)
RUBY_DEBUG_LOG("thread created th:%u, thid: %p, event: %p",
rb_th_serial(th), th->nt->thread_id, th->nt->interrupt_event);
thread_sched_to_running(TH_SCHED(th), th);
ruby_thread_set_native(th);
// kick threads
thread_start_func_2(th, th->ec->machine.stack_start);
w32_close_handle(thread_id);
RUBY_DEBUG_LOG("thread deleted th:%u", rb_th_serial(th));
return 0;
}
static int
native_thread_create(rb_thread_t *th)
{
const size_t stack_size = th->vm->default_params.thread_machine_stack_size + th->vm->default_params.thread_vm_stack_size;
// setup nt
const size_t stack_size = th->vm->default_params.thread_machine_stack_size;
th->nt = ZALLOC(struct rb_native_thread);
th->nt->thread_id = w32_create_thread(stack_size, thread_start_func_1, th);
// setup vm stack
size_t vm_stack_word_size = th->vm->default_params.thread_vm_stack_size / sizeof(VALUE);
void *vm_stack = ruby_xmalloc(vm_stack_word_size * sizeof(VALUE));
th->sched.vm_stack = vm_stack;
rb_ec_initialize_vm_stack(th->ec, vm_stack, vm_stack_word_size);
if ((th->nt->thread_id) == 0) {
return thread_errno;
}
@ -763,12 +781,6 @@ rb_thread_wakeup_timer_thread(int sig)
/* do nothing */
}
static VALUE
rb_thread_start_unblock_thread(void)
{
return Qfalse; /* no-op */
}
static void
rb_thread_create_timer_thread(void)
{
@ -841,26 +853,6 @@ rb_reserved_fd_p(int fd)
return 0;
}
int
rb_sigwait_fd_get(rb_thread_t *th)
{
return -1; /* TODO */
}
NORETURN(void rb_sigwait_fd_put(rb_thread_t *, int));
void
rb_sigwait_fd_put(rb_thread_t *th, int fd)
{
rb_bug("not implemented, should not be called");
}
NORETURN(void rb_sigwait_sleep(const rb_thread_t *, int, const rb_hrtime_t *));
void
rb_sigwait_sleep(const rb_thread_t *th, int fd, const rb_hrtime_t *rel)
{
rb_bug("not implemented, should not be called");
}
rb_nativethread_id_t
rb_nativethread_self(void)
{
@ -881,4 +873,134 @@ native_thread_native_thread_id(rb_thread_t *th)
}
#define USE_NATIVE_THREAD_NATIVE_THREAD_ID 1
void
rb_add_running_thread(rb_thread_t *th){
// do nothing
}
void
rb_del_running_thread(rb_thread_t *th)
{
// do nothing
}
static bool
th_has_dedicated_nt(const rb_thread_t *th)
{
return true;
}
void
rb_threadptr_sched_free(rb_thread_t *th)
{
ruby_xfree(th->nt);
ruby_xfree(th->sched.vm_stack);
}
void
rb_threadptr_remove(rb_thread_t *th)
{
// do nothing
}
void
rb_thread_sched_mark_zombies(rb_vm_t *vm)
{
// do nothing
}
static bool
vm_barrier_finish_p(rb_vm_t *vm)
{
RUBY_DEBUG_LOG("cnt:%u living:%u blocking:%u",
vm->ractor.blocking_cnt == vm->ractor.cnt,
vm->ractor.sync.barrier_cnt,
vm->ractor.cnt,
vm->ractor.blocking_cnt);
VM_ASSERT(vm->ractor.blocking_cnt <= vm->ractor.cnt);
return vm->ractor.blocking_cnt == vm->ractor.cnt;
}
void
rb_ractor_sched_barrier_start(rb_vm_t *vm, rb_ractor_t *cr)
{
vm->ractor.sync.barrier_waiting = true;
RUBY_DEBUG_LOG("barrier start. cnt:%u living:%u blocking:%u",
vm->ractor.sync.barrier_cnt,
vm->ractor.cnt,
vm->ractor.blocking_cnt);
rb_vm_ractor_blocking_cnt_inc(vm, cr, __FILE__, __LINE__);
// send signal
rb_ractor_t *r = 0;
ccan_list_for_each(&vm->ractor.set, r, vmlr_node) {
if (r != cr) {
rb_ractor_vm_barrier_interrupt_running_thread(r);
}
}
// wait
while (!vm_barrier_finish_p(vm)) {
rb_vm_cond_wait(vm, &vm->ractor.sync.barrier_cond);
}
RUBY_DEBUG_LOG("cnt:%u barrier success", vm->ractor.sync.barrier_cnt);
rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__);
vm->ractor.sync.barrier_waiting = false;
vm->ractor.sync.barrier_cnt++;
ccan_list_for_each(&vm->ractor.set, r, vmlr_node) {
rb_native_cond_signal(&r->barrier_wait_cond);
}
}
void
rb_ractor_sched_barrier_join(rb_vm_t *vm, rb_ractor_t *cr)
{
vm->ractor.sync.lock_owner = cr;
unsigned int barrier_cnt = vm->ractor.sync.barrier_cnt;
rb_thread_t *th = GET_THREAD();
bool running;
RB_VM_SAVE_MACHINE_CONTEXT(th);
if (rb_ractor_status_p(cr, ractor_running)) {
rb_vm_ractor_blocking_cnt_inc(vm, cr, __FILE__, __LINE__);
running = true;
}
else {
running = false;
}
VM_ASSERT(rb_ractor_status_p(cr, ractor_blocking));
if (vm_barrier_finish_p(vm)) {
RUBY_DEBUG_LOG("wakeup barrier owner");
rb_native_cond_signal(&vm->ractor.sync.barrier_cond);
}
else {
RUBY_DEBUG_LOG("wait for barrier finish");
}
// wait for restart
while (barrier_cnt == vm->ractor.sync.barrier_cnt) {
vm->ractor.sync.lock_owner = NULL;
rb_native_cond_wait(&cr->barrier_wait_cond, &vm->ractor.sync.lock);
VM_ASSERT(vm->ractor.sync.lock_owner == NULL);
vm->ractor.sync.lock_owner = cr;
}
RUBY_DEBUG_LOG("barrier is released. Acquire vm_lock");
if (running) {
rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__);
}
vm->ractor.sync.lock_owner = NULL;
}
#endif /* THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION */

Просмотреть файл

@ -27,7 +27,7 @@ struct rb_native_thread {
};
struct rb_thread_sched_item {
char dmy;
void *vm_stack;
};
struct rb_thread_sched {

34
vm.c
Просмотреть файл

@ -489,13 +489,14 @@ bool ruby_vm_keep_script_lines;
#ifdef RB_THREAD_LOCAL_SPECIFIER
RB_THREAD_LOCAL_SPECIFIER rb_execution_context_t *ruby_current_ec;
#ifdef RUBY_NT_SERIAL
RB_THREAD_LOCAL_SPECIFIER rb_atomic_t ruby_nt_serial;
#endif
#ifdef __APPLE__
// no-inline decl on thread_pthread.h
rb_execution_context_t *
rb_current_ec(void)
rb_current_ec_noinline(void)
{
return ruby_current_ec;
}
@ -505,8 +506,16 @@ rb_current_ec_set(rb_execution_context_t *ec)
{
ruby_current_ec = ec;
}
#endif
#ifdef __APPLE__
rb_execution_context_t *
rb_current_ec(void)
{
return ruby_current_ec;
}
#endif
#else
native_tls_key_t ruby_current_ec_key;
#endif
@ -2805,6 +2814,8 @@ vm_mark_negative_cme(VALUE val, void *dmy)
return ID_TABLE_CONTINUE;
}
void rb_thread_sched_mark_zombies(rb_vm_t *vm);
void
rb_vm_mark(void *ptr)
{
@ -2876,6 +2887,7 @@ rb_vm_mark(void *ptr)
}
}
rb_thread_sched_mark_zombies(vm);
rb_rjit_mark();
}
@ -3289,12 +3301,16 @@ thread_mark(void *ptr)
RUBY_MARK_LEAVE("thread");
}
void rb_threadptr_sched_free(rb_thread_t *th); // thread_*.c
static void
thread_free(void *ptr)
{
rb_thread_t *th = ptr;
RUBY_FREE_ENTER("thread");
rb_threadptr_sched_free(th);
if (th->locking_mutex != Qfalse) {
rb_bug("thread_free: locking_mutex must be NULL (%p:%p)", (void *)th, (void *)th->locking_mutex);
}
@ -3308,7 +3324,8 @@ thread_free(void *ptr)
RUBY_GC_INFO("MRI main thread\n");
}
else {
ruby_xfree(th->nt); // TODO
// ruby_xfree(th->nt);
// TODO: MN system collect nt, but without MN system it should be freed here.
ruby_xfree(th);
}
@ -3429,8 +3446,10 @@ th_init(rb_thread_t *th, VALUE self, rb_vm_t *vm)
th->ext_config.ractor_safe = true;
#if USE_RUBY_DEBUG_LOG
static rb_atomic_t thread_serial = 0;
static rb_atomic_t thread_serial = 1;
th->serial = RUBY_ATOMIC_FETCH_ADD(thread_serial, 1);
RUBY_DEBUG_LOG("th:%u", th->serial);
#endif
}
@ -4058,8 +4077,11 @@ Init_BareVM(void)
// setup ractor system
rb_native_mutex_initialize(&vm->ractor.sync.lock);
rb_native_cond_initialize(&vm->ractor.sync.barrier_cond);
rb_native_cond_initialize(&vm->ractor.sync.terminate_cond);
#ifdef RUBY_THREAD_WIN32_H
rb_native_cond_initialize(&vm->ractor.sync.barrier_cond);
#endif
}
#ifndef _WIN32

Просмотреть файл

@ -634,15 +634,51 @@ typedef struct rb_vm_struct {
struct rb_ractor_struct *lock_owner;
unsigned int lock_rec;
// barrier
bool barrier_waiting;
unsigned int barrier_cnt;
rb_nativethread_cond_t barrier_cond;
// join at exit
rb_nativethread_cond_t terminate_cond;
bool terminate_waiting;
#ifndef RUBY_THREAD_PTHREAD_H
bool barrier_waiting;
unsigned int barrier_cnt;
rb_nativethread_cond_t barrier_cond;
#endif
} sync;
// ractor scheduling
struct {
rb_nativethread_lock_t lock;
struct rb_ractor_struct *lock_owner;
bool locked;
rb_nativethread_cond_t cond; // GRQ
unsigned int snt_cnt; // count of shared NTs
unsigned int dnt_cnt; // count of dedicated NTs
unsigned int running_cnt;
unsigned int max_cpu;
struct ccan_list_head grq; // // Global Ready Queue
unsigned int grq_cnt;
// running threads
struct ccan_list_head running_threads;
// threads which switch context by timeslice
struct ccan_list_head timeslice_threads;
struct ccan_list_head zombie_threads;
// true if timeslice timer is not enable
bool timeslice_wait_inf;
// barrier
rb_nativethread_cond_t barrier_complete_cond;
rb_nativethread_cond_t barrier_release_cond;
bool barrier_waiting;
unsigned int barrier_waiting_cnt;
unsigned int barrier_serial;
} sched;
} ractor;
#ifdef USE_SIGALTSTACK
@ -1739,6 +1775,7 @@ rb_vm_living_threads_init(rb_vm_t *vm)
ccan_list_head_init(&vm->waiting_fds);
ccan_list_head_init(&vm->workqueue);
ccan_list_head_init(&vm->ractor.set);
ccan_list_head_init(&vm->ractor.sched.zombie_threads);
}
typedef int rb_backtrace_iter_func(void *, VALUE, int, VALUE);
@ -1839,6 +1876,20 @@ rb_current_execution_context(bool expect_ec)
#else
rb_execution_context_t *ec = ruby_current_ec;
#endif
/* On the shared objects, `__tls_get_addr()` is used to access the TLS
* and the address of the `ruby_current_ec` can be stored on a function
* frame. However, this address can be mis-used after native thread
* migration of a coroutine.
* 1) Get `ptr =&ruby_current_ec` op NT1 and store it on the frame.
* 2) Context switch and resume it on the NT2.
* 3) `ptr` is used on NT2 but it accesses to the TLS on NT1.
* This assertion checks such misusage.
*
* To avoid accidents, `GET_EC()` should be called once on the frame.
* Note that inlining can produce the problem.
*/
VM_ASSERT(ec == rb_current_ec_noinline());
#else
rb_execution_context_t *ec = native_tls_get(ruby_current_ec_key);
#endif

114
vm_sync.c
Просмотреть файл

@ -5,7 +5,8 @@
#include "ractor_core.h"
#include "vm_debug.h"
static bool vm_barrier_finish_p(rb_vm_t *vm);
void rb_ractor_sched_barrier_start(rb_vm_t *vm, rb_ractor_t *cr);
void rb_ractor_sched_barrier_join(rb_vm_t *vm, rb_ractor_t *cr);
static bool
vm_locked(rb_vm_t *vm)
@ -52,56 +53,32 @@ vm_lock_enter(rb_ractor_t *cr, rb_vm_t *vm, bool locked, bool no_barrier, unsign
// locking ractor and acquire VM lock will cause deadlock
VM_ASSERT(cr->sync.locked_by != rb_ractor_self(cr));
#endif
// lock
rb_native_mutex_lock(&vm->ractor.sync.lock);
VM_ASSERT(vm->ractor.sync.lock_owner == NULL);
vm->ractor.sync.lock_owner = cr;
VM_ASSERT(vm->ractor.sync.lock_rec == 0);
if (!no_barrier) {
// barrier
while (vm->ractor.sync.barrier_waiting) {
unsigned int barrier_cnt = vm->ractor.sync.barrier_cnt;
rb_thread_t *th = GET_THREAD();
bool running;
#ifdef RUBY_THREAD_PTHREAD_H
if (!no_barrier &&
cr->threads.sched.running != NULL // ractor has running threads.
) {
RB_VM_SAVE_MACHINE_CONTEXT(th);
if (rb_ractor_status_p(cr, ractor_running)) {
rb_vm_ractor_blocking_cnt_inc(vm, cr, __FILE__, __LINE__);
running = true;
}
else {
running = false;
}
VM_ASSERT(rb_ractor_status_p(cr, ractor_blocking));
if (vm_barrier_finish_p(vm)) {
RUBY_DEBUG_LOG("wakeup barrier owner");
rb_native_cond_signal(&vm->ractor.sync.barrier_cond);
}
else {
RUBY_DEBUG_LOG("wait for barrier finish");
}
// wait for restart
while (barrier_cnt == vm->ractor.sync.barrier_cnt) {
vm->ractor.sync.lock_owner = NULL;
rb_native_cond_wait(&cr->barrier_wait_cond, &vm->ractor.sync.lock);
VM_ASSERT(vm->ractor.sync.lock_owner == NULL);
vm->ractor.sync.lock_owner = cr;
}
RUBY_DEBUG_LOG("barrier is released. Acquire vm_lock");
if (running) {
rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__);
}
while (vm->ractor.sched.barrier_waiting) {
RUBY_DEBUG_LOG("barrier serial:%u", vm->ractor.sched.barrier_serial);
rb_ractor_sched_barrier_join(vm, cr);
}
}
#else
if (!no_barrier) {
while (vm->ractor.sync.barrier_waiting) {
rb_ractor_sched_barrier_join(vm, cr);
}
}
#endif
VM_ASSERT(vm->ractor.sync.lock_rec == 0);
VM_ASSERT(vm->ractor.sync.lock_owner == cr);
VM_ASSERT(vm->ractor.sync.lock_owner == NULL);
vm->ractor.sync.lock_owner = cr;
}
vm->ractor.sync.lock_rec++;
@ -114,8 +91,9 @@ vm_lock_enter(rb_ractor_t *cr, rb_vm_t *vm, bool locked, bool no_barrier, unsign
static void
vm_lock_leave(rb_vm_t *vm, unsigned int *lev APPEND_LOCATION_ARGS)
{
RUBY_DEBUG_LOG2(file, line, "rec:%u owner:%u", vm->ractor.sync.lock_rec,
(unsigned int)rb_ractor_id(vm->ractor.sync.lock_owner));
RUBY_DEBUG_LOG2(file, line, "rec:%u owner:%u%s", vm->ractor.sync.lock_rec,
(unsigned int)rb_ractor_id(vm->ractor.sync.lock_owner),
vm->ractor.sync.lock_rec == 1 ? " (leave)" : "");
ASSERT_vm_locking();
VM_ASSERT(vm->ractor.sync.lock_rec > 0);
@ -216,18 +194,6 @@ rb_vm_cond_timedwait(rb_vm_t *vm, rb_nativethread_cond_t *cond, unsigned long ms
vm_cond_wait(vm, cond, msec);
}
static bool
vm_barrier_finish_p(rb_vm_t *vm)
{
RUBY_DEBUG_LOG("cnt:%u living:%u blocking:%u",
vm->ractor.sync.barrier_cnt,
vm->ractor.cnt,
vm->ractor.blocking_cnt);
VM_ASSERT(vm->ractor.blocking_cnt <= vm->ractor.cnt);
return vm->ractor.blocking_cnt == vm->ractor.cnt;
}
void
rb_vm_barrier(void)
{
@ -239,45 +205,13 @@ rb_vm_barrier(void)
}
else {
rb_vm_t *vm = GET_VM();
VM_ASSERT(vm->ractor.sync.barrier_waiting == false);
VM_ASSERT(!vm->ractor.sched.barrier_waiting);
ASSERT_vm_locking();
rb_ractor_t *cr = vm->ractor.sync.lock_owner;
VM_ASSERT(cr == GET_RACTOR());
VM_ASSERT(rb_ractor_status_p(cr, ractor_running));
vm->ractor.sync.barrier_waiting = true;
RUBY_DEBUG_LOG("barrier start. cnt:%u living:%u blocking:%u",
vm->ractor.sync.barrier_cnt,
vm->ractor.cnt,
vm->ractor.blocking_cnt);
rb_vm_ractor_blocking_cnt_inc(vm, cr, __FILE__, __LINE__);
// send signal
rb_ractor_t *r = 0;
ccan_list_for_each(&vm->ractor.set, r, vmlr_node) {
if (r != cr) {
rb_ractor_vm_barrier_interrupt_running_thread(r);
}
}
// wait
while (!vm_barrier_finish_p(vm)) {
rb_vm_cond_wait(vm, &vm->ractor.sync.barrier_cond);
}
RUBY_DEBUG_LOG("cnt:%u barrier success", vm->ractor.sync.barrier_cnt);
rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__);
vm->ractor.sync.barrier_waiting = false;
vm->ractor.sync.barrier_cnt++;
ccan_list_for_each(&vm->ractor.set, r, vmlr_node) {
rb_native_cond_signal(&r->barrier_wait_cond);
}
rb_ractor_sched_barrier_start(vm, cr);
}
}