зеркало из https://github.com/github/ruby.git
3833 строки
98 KiB
C
3833 строки
98 KiB
C
// Ractor implementation
|
|
|
|
#include "ruby/ruby.h"
|
|
#include "ruby/thread.h"
|
|
#include "ruby/ractor.h"
|
|
#include "ruby/thread_native.h"
|
|
#include "vm_core.h"
|
|
#include "eval_intern.h"
|
|
#include "vm_sync.h"
|
|
#include "ractor_core.h"
|
|
#include "internal/complex.h"
|
|
#include "internal/error.h"
|
|
#include "internal/gc.h"
|
|
#include "internal/hash.h"
|
|
#include "internal/rational.h"
|
|
#include "internal/struct.h"
|
|
#include "internal/thread.h"
|
|
#include "variable.h"
|
|
#include "yjit.h"
|
|
#include "rjit.h"
|
|
|
|
VALUE rb_cRactor;
|
|
static VALUE rb_cRactorSelector;
|
|
|
|
VALUE rb_eRactorUnsafeError;
|
|
VALUE rb_eRactorIsolationError;
|
|
static VALUE rb_eRactorError;
|
|
static VALUE rb_eRactorRemoteError;
|
|
static VALUE rb_eRactorMovedError;
|
|
static VALUE rb_eRactorClosedError;
|
|
static VALUE rb_cRactorMovedObject;
|
|
|
|
static void vm_ractor_blocking_cnt_inc(rb_vm_t *vm, rb_ractor_t *r, const char *file, int line);
|
|
|
|
// Ractor locking
|
|
|
|
static void
|
|
ASSERT_ractor_unlocking(rb_ractor_t *r)
|
|
{
|
|
#if RACTOR_CHECK_MODE > 0
|
|
// GET_EC is NULL in an RJIT worker
|
|
if (rb_current_execution_context(false) != NULL && r->sync.locked_by == rb_ractor_self(GET_RACTOR())) {
|
|
rb_bug("recursive ractor locking");
|
|
}
|
|
#endif
|
|
}
|
|
|
|
static void
|
|
ASSERT_ractor_locking(rb_ractor_t *r)
|
|
{
|
|
#if RACTOR_CHECK_MODE > 0
|
|
// GET_EC is NULL in an RJIT worker
|
|
if (rb_current_execution_context(false) != NULL && r->sync.locked_by != rb_ractor_self(GET_RACTOR())) {
|
|
rp(r->sync.locked_by);
|
|
rb_bug("ractor lock is not acquired.");
|
|
}
|
|
#endif
|
|
}
|
|
|
|
static void
|
|
ractor_lock(rb_ractor_t *r, const char *file, int line)
|
|
{
|
|
RUBY_DEBUG_LOG2(file, line, "locking r:%u%s", r->pub.id, rb_current_ractor_raw(false) == r ? " (self)" : "");
|
|
|
|
ASSERT_ractor_unlocking(r);
|
|
rb_native_mutex_lock(&r->sync.lock);
|
|
|
|
#if RACTOR_CHECK_MODE > 0
|
|
if (rb_current_execution_context(false) != NULL) { // GET_EC is NULL in an RJIT worker
|
|
rb_ractor_t *cr = rb_current_ractor_raw(false);
|
|
r->sync.locked_by = cr ? rb_ractor_self(cr) : Qundef;
|
|
}
|
|
#endif
|
|
|
|
RUBY_DEBUG_LOG2(file, line, "locked r:%u%s", r->pub.id, rb_current_ractor_raw(false) == r ? " (self)" : "");
|
|
}
|
|
|
|
static void
|
|
ractor_lock_self(rb_ractor_t *cr, const char *file, int line)
|
|
{
|
|
VM_ASSERT(cr == GET_RACTOR());
|
|
#if RACTOR_CHECK_MODE > 0
|
|
VM_ASSERT(cr->sync.locked_by != cr->pub.self);
|
|
#endif
|
|
ractor_lock(cr, file, line);
|
|
}
|
|
|
|
static void
|
|
ractor_unlock(rb_ractor_t *r, const char *file, int line)
|
|
{
|
|
ASSERT_ractor_locking(r);
|
|
#if RACTOR_CHECK_MODE > 0
|
|
r->sync.locked_by = Qnil;
|
|
#endif
|
|
rb_native_mutex_unlock(&r->sync.lock);
|
|
|
|
RUBY_DEBUG_LOG2(file, line, "r:%u%s", r->pub.id, rb_current_ractor_raw(false) == r ? " (self)" : "");
|
|
}
|
|
|
|
static void
|
|
ractor_unlock_self(rb_ractor_t *cr, const char *file, int line)
|
|
{
|
|
VM_ASSERT(cr == GET_RACTOR());
|
|
#if RACTOR_CHECK_MODE > 0
|
|
VM_ASSERT(cr->sync.locked_by == cr->pub.self);
|
|
#endif
|
|
ractor_unlock(cr, file, line);
|
|
}
|
|
|
|
#define RACTOR_LOCK(r) ractor_lock(r, __FILE__, __LINE__)
|
|
#define RACTOR_UNLOCK(r) ractor_unlock(r, __FILE__, __LINE__)
|
|
#define RACTOR_LOCK_SELF(r) ractor_lock_self(r, __FILE__, __LINE__)
|
|
#define RACTOR_UNLOCK_SELF(r) ractor_unlock_self(r, __FILE__, __LINE__)
|
|
|
|
void
|
|
rb_ractor_lock_self(rb_ractor_t *r)
|
|
{
|
|
RACTOR_LOCK_SELF(r);
|
|
}
|
|
|
|
void
|
|
rb_ractor_unlock_self(rb_ractor_t *r)
|
|
{
|
|
RACTOR_UNLOCK_SELF(r);
|
|
}
|
|
|
|
// Ractor status
|
|
|
|
static const char *
|
|
ractor_status_str(enum ractor_status status)
|
|
{
|
|
switch (status) {
|
|
case ractor_created: return "created";
|
|
case ractor_running: return "running";
|
|
case ractor_blocking: return "blocking";
|
|
case ractor_terminated: return "terminated";
|
|
}
|
|
rb_bug("unreachable");
|
|
}
|
|
|
|
static void
|
|
ractor_status_set(rb_ractor_t *r, enum ractor_status status)
|
|
{
|
|
RUBY_DEBUG_LOG("r:%u [%s]->[%s]", r->pub.id, ractor_status_str(r->status_), ractor_status_str(status));
|
|
|
|
// check 1
|
|
if (r->status_ != ractor_created) {
|
|
VM_ASSERT(r == GET_RACTOR()); // only self-modification is allowed.
|
|
ASSERT_vm_locking();
|
|
}
|
|
|
|
// check2: transition check. assume it will be vanished on non-debug build.
|
|
switch (r->status_) {
|
|
case ractor_created:
|
|
VM_ASSERT(status == ractor_blocking);
|
|
break;
|
|
case ractor_running:
|
|
VM_ASSERT(status == ractor_blocking||
|
|
status == ractor_terminated);
|
|
break;
|
|
case ractor_blocking:
|
|
VM_ASSERT(status == ractor_running);
|
|
break;
|
|
case ractor_terminated:
|
|
rb_bug("unreachable");
|
|
break;
|
|
}
|
|
|
|
r->status_ = status;
|
|
}
|
|
|
|
static bool
|
|
ractor_status_p(rb_ractor_t *r, enum ractor_status status)
|
|
{
|
|
return rb_ractor_status_p(r, status);
|
|
}
|
|
|
|
// Ractor data/mark/free
|
|
|
|
static struct rb_ractor_basket *ractor_queue_at(rb_ractor_t *r, struct rb_ractor_queue *rq, int i);
|
|
static void ractor_local_storage_mark(rb_ractor_t *r);
|
|
static void ractor_local_storage_free(rb_ractor_t *r);
|
|
|
|
static void
|
|
ractor_queue_mark(struct rb_ractor_queue *rq)
|
|
{
|
|
for (int i=0; i<rq->cnt; i++) {
|
|
struct rb_ractor_basket *b = ractor_queue_at(NULL, rq, i);
|
|
rb_gc_mark(b->sender);
|
|
|
|
switch (b->type.e) {
|
|
case basket_type_yielding:
|
|
case basket_type_take_basket:
|
|
case basket_type_deleted:
|
|
case basket_type_reserved:
|
|
// ignore
|
|
break;
|
|
default:
|
|
rb_gc_mark(b->p.send.v);
|
|
}
|
|
}
|
|
}
|
|
|
|
static void
|
|
ractor_mark(void *ptr)
|
|
{
|
|
rb_ractor_t *r = (rb_ractor_t *)ptr;
|
|
|
|
ractor_queue_mark(&r->sync.recv_queue);
|
|
ractor_queue_mark(&r->sync.takers_queue);
|
|
|
|
rb_gc_mark(r->receiving_mutex);
|
|
|
|
rb_gc_mark(r->loc);
|
|
rb_gc_mark(r->name);
|
|
rb_gc_mark(r->r_stdin);
|
|
rb_gc_mark(r->r_stdout);
|
|
rb_gc_mark(r->r_stderr);
|
|
rb_hook_list_mark(&r->pub.hooks);
|
|
|
|
if (r->threads.cnt > 0) {
|
|
rb_thread_t *th = 0;
|
|
ccan_list_for_each(&r->threads.set, th, lt_node) {
|
|
VM_ASSERT(th != NULL);
|
|
rb_gc_mark(th->self);
|
|
}
|
|
}
|
|
|
|
ractor_local_storage_mark(r);
|
|
}
|
|
|
|
static void
|
|
ractor_queue_free(struct rb_ractor_queue *rq)
|
|
{
|
|
free(rq->baskets);
|
|
}
|
|
|
|
static void
|
|
ractor_free(void *ptr)
|
|
{
|
|
rb_ractor_t *r = (rb_ractor_t *)ptr;
|
|
RUBY_DEBUG_LOG("free r:%d", rb_ractor_id(r));
|
|
rb_native_mutex_destroy(&r->sync.lock);
|
|
#ifdef RUBY_THREAD_WIN32_H
|
|
rb_native_cond_destroy(&r->sync.cond);
|
|
#endif
|
|
ractor_queue_free(&r->sync.recv_queue);
|
|
ractor_queue_free(&r->sync.takers_queue);
|
|
ractor_local_storage_free(r);
|
|
rb_hook_list_free(&r->pub.hooks);
|
|
ruby_xfree(r);
|
|
}
|
|
|
|
static size_t
|
|
ractor_queue_memsize(const struct rb_ractor_queue *rq)
|
|
{
|
|
return sizeof(struct rb_ractor_basket) * rq->size;
|
|
}
|
|
|
|
static size_t
|
|
ractor_memsize(const void *ptr)
|
|
{
|
|
rb_ractor_t *r = (rb_ractor_t *)ptr;
|
|
|
|
// TODO: more correct?
|
|
return sizeof(rb_ractor_t) +
|
|
ractor_queue_memsize(&r->sync.recv_queue) +
|
|
ractor_queue_memsize(&r->sync.takers_queue);
|
|
}
|
|
|
|
static const rb_data_type_t ractor_data_type = {
|
|
"ractor",
|
|
{
|
|
ractor_mark,
|
|
ractor_free,
|
|
ractor_memsize,
|
|
NULL, // update
|
|
},
|
|
0, 0, RUBY_TYPED_FREE_IMMEDIATELY /* | RUBY_TYPED_WB_PROTECTED */
|
|
};
|
|
|
|
bool
|
|
rb_ractor_p(VALUE gv)
|
|
{
|
|
if (rb_typeddata_is_kind_of(gv, &ractor_data_type)) {
|
|
return true;
|
|
}
|
|
else {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
static inline rb_ractor_t *
|
|
RACTOR_PTR(VALUE self)
|
|
{
|
|
VM_ASSERT(rb_ractor_p(self));
|
|
rb_ractor_t *r = DATA_PTR(self);
|
|
return r;
|
|
}
|
|
|
|
static rb_atomic_t ractor_last_id;
|
|
|
|
#if RACTOR_CHECK_MODE > 0
|
|
uint32_t
|
|
rb_ractor_current_id(void)
|
|
{
|
|
if (GET_THREAD()->ractor == NULL) {
|
|
return 1; // main ractor
|
|
}
|
|
else {
|
|
return rb_ractor_id(GET_RACTOR());
|
|
}
|
|
}
|
|
#endif
|
|
|
|
// Ractor queue
|
|
|
|
static void
|
|
ractor_queue_setup(struct rb_ractor_queue *rq)
|
|
{
|
|
rq->size = 2;
|
|
rq->cnt = 0;
|
|
rq->start = 0;
|
|
rq->baskets = malloc(sizeof(struct rb_ractor_basket) * rq->size);
|
|
}
|
|
|
|
static struct rb_ractor_basket *
|
|
ractor_queue_head(rb_ractor_t *r, struct rb_ractor_queue *rq)
|
|
{
|
|
if (r != NULL) ASSERT_ractor_locking(r);
|
|
return &rq->baskets[rq->start];
|
|
}
|
|
|
|
static struct rb_ractor_basket *
|
|
ractor_queue_at(rb_ractor_t *r, struct rb_ractor_queue *rq, int i)
|
|
{
|
|
if (r != NULL) ASSERT_ractor_locking(r);
|
|
return &rq->baskets[(rq->start + i) % rq->size];
|
|
}
|
|
|
|
static void
|
|
ractor_queue_advance(rb_ractor_t *r, struct rb_ractor_queue *rq)
|
|
{
|
|
ASSERT_ractor_locking(r);
|
|
|
|
if (rq->reserved_cnt == 0) {
|
|
rq->cnt--;
|
|
rq->start = (rq->start + 1) % rq->size;
|
|
rq->serial++;
|
|
}
|
|
else {
|
|
ractor_queue_at(r, rq, 0)->type.e = basket_type_deleted;
|
|
}
|
|
}
|
|
|
|
static bool
|
|
ractor_queue_skip_p(rb_ractor_t *r, struct rb_ractor_queue *rq, int i)
|
|
{
|
|
struct rb_ractor_basket *b = ractor_queue_at(r, rq, i);
|
|
return basket_type_p(b, basket_type_deleted) ||
|
|
basket_type_p(b, basket_type_reserved);
|
|
}
|
|
|
|
static void
|
|
ractor_queue_compact(rb_ractor_t *r, struct rb_ractor_queue *rq)
|
|
{
|
|
ASSERT_ractor_locking(r);
|
|
|
|
while (rq->cnt > 0 && basket_type_p(ractor_queue_at(r, rq, 0), basket_type_deleted)) {
|
|
ractor_queue_advance(r, rq);
|
|
}
|
|
}
|
|
|
|
static bool
|
|
ractor_queue_empty_p(rb_ractor_t *r, struct rb_ractor_queue *rq)
|
|
{
|
|
ASSERT_ractor_locking(r);
|
|
|
|
if (rq->cnt == 0) {
|
|
return true;
|
|
}
|
|
|
|
ractor_queue_compact(r, rq);
|
|
|
|
for (int i=0; i<rq->cnt; i++) {
|
|
if (!ractor_queue_skip_p(r, rq, i)) {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
static bool
|
|
ractor_queue_deq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket)
|
|
{
|
|
ASSERT_ractor_locking(r);
|
|
|
|
for (int i=0; i<rq->cnt; i++) {
|
|
if (!ractor_queue_skip_p(r, rq, i)) {
|
|
struct rb_ractor_basket *b = ractor_queue_at(r, rq, i);
|
|
*basket = *b;
|
|
|
|
// remove from queue
|
|
b->type.e = basket_type_deleted;
|
|
ractor_queue_compact(r, rq);
|
|
return true;
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
static void
|
|
ractor_queue_enq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket)
|
|
{
|
|
ASSERT_ractor_locking(r);
|
|
|
|
if (rq->size <= rq->cnt) {
|
|
rq->baskets = realloc(rq->baskets, sizeof(struct rb_ractor_basket) * rq->size * 2);
|
|
for (int i=rq->size - rq->start; i<rq->cnt; i++) {
|
|
rq->baskets[i + rq->start] = rq->baskets[i + rq->start - rq->size];
|
|
}
|
|
rq->size *= 2;
|
|
}
|
|
rq->baskets[(rq->start + rq->cnt++) % rq->size] = *basket;
|
|
// fprintf(stderr, "%s %p->cnt:%d\n", RUBY_FUNCTION_NAME_STRING, (void *)rq, rq->cnt);
|
|
}
|
|
|
|
static void
|
|
ractor_queue_delete(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket)
|
|
{
|
|
basket->type.e = basket_type_deleted;
|
|
}
|
|
|
|
// Ractor basket
|
|
|
|
static VALUE ractor_reset_belonging(VALUE obj); // in this file
|
|
|
|
static VALUE
|
|
ractor_basket_value(struct rb_ractor_basket *b)
|
|
{
|
|
switch (b->type.e) {
|
|
case basket_type_ref:
|
|
break;
|
|
case basket_type_copy:
|
|
case basket_type_move:
|
|
case basket_type_will:
|
|
b->type.e = basket_type_ref;
|
|
b->p.send.v = ractor_reset_belonging(b->p.send.v);
|
|
break;
|
|
default:
|
|
rb_bug("unreachable");
|
|
}
|
|
|
|
return b->p.send.v;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_basket_accept(struct rb_ractor_basket *b)
|
|
{
|
|
VALUE v = ractor_basket_value(b);
|
|
|
|
if (b->p.send.exception) {
|
|
VALUE cause = v;
|
|
VALUE err = rb_exc_new_cstr(rb_eRactorRemoteError, "thrown by remote Ractor.");
|
|
rb_ivar_set(err, rb_intern("@ractor"), b->sender);
|
|
rb_ec_setup_exception(NULL, err, cause);
|
|
rb_exc_raise(err);
|
|
}
|
|
|
|
return v;
|
|
}
|
|
|
|
// Ractor synchronizations
|
|
|
|
#if USE_RUBY_DEBUG_LOG
|
|
static const char *
|
|
wait_status_str(enum rb_ractor_wait_status wait_status)
|
|
{
|
|
switch ((int)wait_status) {
|
|
case wait_none: return "none";
|
|
case wait_receiving: return "receiving";
|
|
case wait_taking: return "taking";
|
|
case wait_yielding: return "yielding";
|
|
case wait_receiving|wait_taking: return "receiving|taking";
|
|
case wait_receiving|wait_yielding: return "receiving|yielding";
|
|
case wait_taking|wait_yielding: return "taking|yielding";
|
|
case wait_receiving|wait_taking|wait_yielding: return "receiving|taking|yielding";
|
|
}
|
|
rb_bug("unreachable");
|
|
}
|
|
|
|
static const char *
|
|
wakeup_status_str(enum rb_ractor_wakeup_status wakeup_status)
|
|
{
|
|
switch (wakeup_status) {
|
|
case wakeup_none: return "none";
|
|
case wakeup_by_send: return "by_send";
|
|
case wakeup_by_yield: return "by_yield";
|
|
case wakeup_by_take: return "by_take";
|
|
case wakeup_by_close: return "by_close";
|
|
case wakeup_by_interrupt: return "by_interrupt";
|
|
case wakeup_by_retry: return "by_retry";
|
|
}
|
|
rb_bug("unreachable");
|
|
}
|
|
|
|
static const char *
|
|
basket_type_name(enum rb_ractor_basket_type type)
|
|
{
|
|
switch (type) {
|
|
case basket_type_none: return "none";
|
|
case basket_type_ref: return "ref";
|
|
case basket_type_copy: return "copy";
|
|
case basket_type_move: return "move";
|
|
case basket_type_will: return "will";
|
|
case basket_type_deleted: return "deleted";
|
|
case basket_type_reserved: return "reserved";
|
|
case basket_type_take_basket: return "take_basket";
|
|
case basket_type_yielding: return "yielding";
|
|
}
|
|
VM_ASSERT(0);
|
|
return NULL;
|
|
}
|
|
#endif // USE_RUBY_DEBUG_LOG
|
|
|
|
static bool
|
|
ractor_sleeping_by(const rb_ractor_t *r, enum rb_ractor_wait_status wait_status)
|
|
{
|
|
return (r->sync.wait.status & wait_status) && r->sync.wait.wakeup_status == wakeup_none;
|
|
}
|
|
|
|
#ifdef RUBY_THREAD_PTHREAD_H
|
|
// thread_*.c
|
|
void rb_ractor_sched_wakeup(rb_ractor_t *r);
|
|
#else
|
|
|
|
static void
|
|
rb_ractor_sched_wakeup(rb_ractor_t *r)
|
|
{
|
|
rb_native_cond_broadcast(&r->sync.cond);
|
|
}
|
|
#endif
|
|
|
|
|
|
static bool
|
|
ractor_wakeup(rb_ractor_t *r, enum rb_ractor_wait_status wait_status, enum rb_ractor_wakeup_status wakeup_status)
|
|
{
|
|
ASSERT_ractor_locking(r);
|
|
|
|
RUBY_DEBUG_LOG("r:%u wait_by:%s -> wait:%s wakeup:%s",
|
|
rb_ractor_id(r),
|
|
wait_status_str(r->sync.wait.status),
|
|
wait_status_str(wait_status),
|
|
wakeup_status_str(wakeup_status));
|
|
|
|
if (ractor_sleeping_by(r, wait_status)) {
|
|
r->sync.wait.wakeup_status = wakeup_status;
|
|
rb_ractor_sched_wakeup(r);
|
|
return true;
|
|
}
|
|
else {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
static void
|
|
ractor_sleep_interrupt(void *ptr)
|
|
{
|
|
rb_ractor_t *r = ptr;
|
|
|
|
RACTOR_LOCK(r);
|
|
{
|
|
ractor_wakeup(r, wait_receiving | wait_taking | wait_yielding, wakeup_by_interrupt);
|
|
}
|
|
RACTOR_UNLOCK(r);
|
|
}
|
|
|
|
typedef void (*ractor_sleep_cleanup_function)(rb_ractor_t *cr, void *p);
|
|
|
|
static void
|
|
ractor_check_ints(rb_execution_context_t *ec, rb_ractor_t *cr, ractor_sleep_cleanup_function cf_func, void *cf_data)
|
|
{
|
|
if (cr->sync.wait.status != wait_none) {
|
|
enum rb_ractor_wait_status prev_wait_status = cr->sync.wait.status;
|
|
cr->sync.wait.status = wait_none;
|
|
cr->sync.wait.wakeup_status = wakeup_by_interrupt;
|
|
|
|
RACTOR_UNLOCK(cr);
|
|
{
|
|
if (cf_func) {
|
|
int state;
|
|
EC_PUSH_TAG(ec);
|
|
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
|
|
rb_thread_check_ints();
|
|
}
|
|
EC_POP_TAG();
|
|
|
|
if (state) {
|
|
(*cf_func)(cr, cf_data);
|
|
EC_JUMP_TAG(ec, state);
|
|
}
|
|
}
|
|
else {
|
|
rb_thread_check_ints();
|
|
}
|
|
}
|
|
|
|
// reachable?
|
|
RACTOR_LOCK(cr);
|
|
cr->sync.wait.status = prev_wait_status;
|
|
}
|
|
}
|
|
|
|
#ifdef RUBY_THREAD_PTHREAD_H
|
|
void rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf);
|
|
#else
|
|
|
|
// win32
|
|
static void
|
|
ractor_cond_wait(rb_ractor_t *r)
|
|
{
|
|
#if RACTOR_CHECK_MODE > 0
|
|
VALUE locked_by = r->sync.locked_by;
|
|
r->sync.locked_by = Qnil;
|
|
#endif
|
|
rb_native_cond_wait(&r->sync.cond, &r->sync.lock);
|
|
|
|
#if RACTOR_CHECK_MODE > 0
|
|
r->sync.locked_by = locked_by;
|
|
#endif
|
|
}
|
|
|
|
static void *
|
|
ractor_sleep_wo_gvl(void *ptr)
|
|
{
|
|
rb_ractor_t *cr = ptr;
|
|
RACTOR_LOCK_SELF(cr);
|
|
{
|
|
VM_ASSERT(cr->sync.wait.status != wait_none);
|
|
if (cr->sync.wait.wakeup_status == wakeup_none) {
|
|
ractor_cond_wait(cr);
|
|
}
|
|
cr->sync.wait.status = wait_none;
|
|
}
|
|
RACTOR_UNLOCK_SELF(cr);
|
|
return NULL;
|
|
}
|
|
|
|
static void
|
|
rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf)
|
|
{
|
|
RACTOR_UNLOCK(cr);
|
|
{
|
|
rb_nogvl(ractor_sleep_wo_gvl, cr,
|
|
ubf, cr,
|
|
RB_NOGVL_UBF_ASYNC_SAFE | RB_NOGVL_INTR_FAIL);
|
|
}
|
|
RACTOR_LOCK(cr);
|
|
}
|
|
#endif
|
|
|
|
static enum rb_ractor_wakeup_status
|
|
ractor_sleep_with_cleanup(rb_execution_context_t *ec, rb_ractor_t *cr, enum rb_ractor_wait_status wait_status,
|
|
ractor_sleep_cleanup_function cf_func, void *cf_data)
|
|
{
|
|
enum rb_ractor_wakeup_status wakeup_status;
|
|
VM_ASSERT(GET_RACTOR() == cr);
|
|
|
|
// TODO: multi-threads
|
|
VM_ASSERT(cr->sync.wait.status == wait_none);
|
|
VM_ASSERT(wait_status != wait_none);
|
|
cr->sync.wait.status = wait_status;
|
|
cr->sync.wait.wakeup_status = wakeup_none;
|
|
|
|
// fprintf(stderr, "%s r:%p status:%s, wakeup_status:%s\n", RUBY_FUNCTION_NAME_STRING, (void *)cr,
|
|
// wait_status_str(cr->sync.wait.status), wakeup_status_str(cr->sync.wait.wakeup_status));
|
|
|
|
RUBY_DEBUG_LOG("sleep by %s", wait_status_str(wait_status));
|
|
|
|
while (cr->sync.wait.wakeup_status == wakeup_none) {
|
|
rb_ractor_sched_sleep(ec, cr, ractor_sleep_interrupt);
|
|
ractor_check_ints(ec, cr, cf_func, cf_data);
|
|
}
|
|
|
|
cr->sync.wait.status = wait_none;
|
|
|
|
// TODO: multi-thread
|
|
wakeup_status = cr->sync.wait.wakeup_status;
|
|
cr->sync.wait.wakeup_status = wakeup_none;
|
|
|
|
RUBY_DEBUG_LOG("wakeup %s", wakeup_status_str(wakeup_status));
|
|
|
|
return wakeup_status;
|
|
}
|
|
|
|
static enum rb_ractor_wakeup_status
|
|
ractor_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, enum rb_ractor_wait_status wait_status)
|
|
{
|
|
return ractor_sleep_with_cleanup(ec, cr, wait_status, 0, NULL);
|
|
}
|
|
|
|
// Ractor.receive
|
|
|
|
static void
|
|
ractor_recursive_receive_if(rb_ractor_t *r)
|
|
{
|
|
if (r->receiving_mutex && rb_mutex_owned_p(r->receiving_mutex)) {
|
|
rb_raise(rb_eRactorError, "can not call receive/receive_if recursively");
|
|
}
|
|
}
|
|
|
|
static VALUE
|
|
ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *rq)
|
|
{
|
|
struct rb_ractor_basket basket;
|
|
ractor_recursive_receive_if(cr);
|
|
bool received = false;
|
|
|
|
RACTOR_LOCK_SELF(cr);
|
|
{
|
|
RUBY_DEBUG_LOG("rq->cnt:%d", rq->cnt);
|
|
received = ractor_queue_deq(cr, rq, &basket);
|
|
}
|
|
RACTOR_UNLOCK_SELF(cr);
|
|
|
|
if (!received) {
|
|
if (cr->sync.incoming_port_closed) {
|
|
rb_raise(rb_eRactorClosedError, "The incoming port is already closed");
|
|
}
|
|
return Qundef;
|
|
}
|
|
else {
|
|
return ractor_basket_accept(&basket);
|
|
}
|
|
}
|
|
|
|
static void
|
|
ractor_wait_receive(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *rq)
|
|
{
|
|
VM_ASSERT(cr == rb_ec_ractor_ptr(ec));
|
|
ractor_recursive_receive_if(cr);
|
|
|
|
RACTOR_LOCK(cr);
|
|
{
|
|
while (ractor_queue_empty_p(cr, rq)) {
|
|
ractor_sleep(ec, cr, wait_receiving);
|
|
}
|
|
}
|
|
RACTOR_UNLOCK(cr);
|
|
}
|
|
|
|
static VALUE
|
|
ractor_receive(rb_execution_context_t *ec, rb_ractor_t *cr)
|
|
{
|
|
VM_ASSERT(cr == rb_ec_ractor_ptr(ec));
|
|
VALUE v;
|
|
struct rb_ractor_queue *rq = &cr->sync.recv_queue;
|
|
|
|
while (UNDEF_P(v = ractor_try_receive(ec, cr, rq))) {
|
|
ractor_wait_receive(ec, cr, rq);
|
|
}
|
|
|
|
return v;
|
|
}
|
|
|
|
#if 0
|
|
static void
|
|
rq_dump(struct rb_ractor_queue *rq)
|
|
{
|
|
bool bug = false;
|
|
for (int i=0; i<rq->cnt; i++) {
|
|
struct rb_ractor_basket *b = ractor_queue_at(NULL, rq, i);
|
|
fprintf(stderr, "%d (start:%d) type:%s %p %s\n", i, rq->start, basket_type_name(b->type),
|
|
(void *)b, RSTRING_PTR(RARRAY_AREF(b->v, 1)));
|
|
if (basket_type_p(b, basket_type_reserved) bug = true;
|
|
}
|
|
if (bug) rb_bug("!!");
|
|
}
|
|
#endif
|
|
|
|
struct receive_block_data {
|
|
rb_ractor_t *cr;
|
|
struct rb_ractor_queue *rq;
|
|
VALUE v;
|
|
int index;
|
|
bool success;
|
|
};
|
|
|
|
static void
|
|
ractor_receive_if_lock(rb_ractor_t *cr)
|
|
{
|
|
VALUE m = cr->receiving_mutex;
|
|
if (m == Qfalse) {
|
|
m = cr->receiving_mutex = rb_mutex_new();
|
|
}
|
|
rb_mutex_lock(m);
|
|
}
|
|
|
|
static VALUE
|
|
receive_if_body(VALUE ptr)
|
|
{
|
|
struct receive_block_data *data = (struct receive_block_data *)ptr;
|
|
|
|
ractor_receive_if_lock(data->cr);
|
|
VALUE block_result = rb_yield(data->v);
|
|
rb_ractor_t *cr = data->cr;
|
|
|
|
RACTOR_LOCK_SELF(cr);
|
|
{
|
|
struct rb_ractor_basket *b = ractor_queue_at(cr, data->rq, data->index);
|
|
VM_ASSERT(basket_type_p(b, basket_type_reserved));
|
|
data->rq->reserved_cnt--;
|
|
|
|
if (RTEST(block_result)) {
|
|
ractor_queue_delete(cr, data->rq, b);
|
|
ractor_queue_compact(cr, data->rq);
|
|
}
|
|
else {
|
|
b->type.e = basket_type_ref;
|
|
}
|
|
}
|
|
RACTOR_UNLOCK_SELF(cr);
|
|
|
|
data->success = true;
|
|
|
|
if (RTEST(block_result)) {
|
|
return data->v;
|
|
}
|
|
else {
|
|
return Qundef;
|
|
}
|
|
}
|
|
|
|
static VALUE
|
|
receive_if_ensure(VALUE v)
|
|
{
|
|
struct receive_block_data *data = (struct receive_block_data *)v;
|
|
rb_ractor_t *cr = data->cr;
|
|
|
|
if (!data->success) {
|
|
RACTOR_LOCK_SELF(cr);
|
|
{
|
|
struct rb_ractor_basket *b = ractor_queue_at(cr, data->rq, data->index);
|
|
VM_ASSERT(basket_type_p(b, basket_type_reserved));
|
|
b->type.e = basket_type_deleted;
|
|
data->rq->reserved_cnt--;
|
|
}
|
|
RACTOR_UNLOCK_SELF(cr);
|
|
}
|
|
|
|
rb_mutex_unlock(cr->receiving_mutex);
|
|
return Qnil;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_receive_if(rb_execution_context_t *ec, VALUE crv, VALUE b)
|
|
{
|
|
if (!RTEST(b)) rb_raise(rb_eArgError, "no block given");
|
|
|
|
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
|
|
unsigned int serial = (unsigned int)-1;
|
|
int index = 0;
|
|
struct rb_ractor_queue *rq = &cr->sync.recv_queue;
|
|
|
|
while (1) {
|
|
VALUE v = Qundef;
|
|
|
|
ractor_wait_receive(ec, cr, rq);
|
|
|
|
RACTOR_LOCK_SELF(cr);
|
|
{
|
|
if (serial != rq->serial) {
|
|
serial = rq->serial;
|
|
index = 0;
|
|
}
|
|
|
|
// check newer version
|
|
for (int i=index; i<rq->cnt; i++) {
|
|
if (!ractor_queue_skip_p(cr, rq, i)) {
|
|
struct rb_ractor_basket *b = ractor_queue_at(cr, rq, i);
|
|
v = ractor_basket_value(b);
|
|
b->type.e = basket_type_reserved;
|
|
rq->reserved_cnt++;
|
|
index = i;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
RACTOR_UNLOCK_SELF(cr);
|
|
|
|
if (!UNDEF_P(v)) {
|
|
struct receive_block_data data = {
|
|
.cr = cr,
|
|
.rq = rq,
|
|
.v = v,
|
|
.index = index,
|
|
.success = false,
|
|
};
|
|
|
|
VALUE result = rb_ensure(receive_if_body, (VALUE)&data,
|
|
receive_if_ensure, (VALUE)&data);
|
|
|
|
if (!UNDEF_P(result)) return result;
|
|
index++;
|
|
}
|
|
|
|
RUBY_VM_CHECK_INTS(ec);
|
|
}
|
|
}
|
|
|
|
static void
|
|
ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_basket *b)
|
|
{
|
|
bool closed = false;
|
|
|
|
RACTOR_LOCK(r);
|
|
{
|
|
if (r->sync.incoming_port_closed) {
|
|
closed = true;
|
|
}
|
|
else {
|
|
ractor_queue_enq(r, &r->sync.recv_queue, b);
|
|
ractor_wakeup(r, wait_receiving, wakeup_by_send);
|
|
}
|
|
}
|
|
RACTOR_UNLOCK(r);
|
|
|
|
if (closed) {
|
|
rb_raise(rb_eRactorClosedError, "The incoming-port is already closed");
|
|
}
|
|
}
|
|
|
|
// Ractor#send
|
|
|
|
static VALUE ractor_move(VALUE obj); // in this file
|
|
static VALUE ractor_copy(VALUE obj); // in this file
|
|
|
|
static void
|
|
ractor_basket_prepare_contents(VALUE obj, VALUE move, volatile VALUE *pobj, enum rb_ractor_basket_type *ptype)
|
|
{
|
|
VALUE v;
|
|
enum rb_ractor_basket_type type;
|
|
|
|
if (rb_ractor_shareable_p(obj)) {
|
|
type = basket_type_ref;
|
|
v = obj;
|
|
}
|
|
else if (!RTEST(move)) {
|
|
v = ractor_copy(obj);
|
|
type = basket_type_copy;
|
|
}
|
|
else {
|
|
type = basket_type_move;
|
|
v = ractor_move(obj);
|
|
}
|
|
|
|
*pobj = v;
|
|
*ptype = type;
|
|
}
|
|
|
|
static void
|
|
ractor_basket_fill_(rb_ractor_t *cr, struct rb_ractor_basket *basket, VALUE obj, bool exc)
|
|
{
|
|
VM_ASSERT(cr == GET_RACTOR());
|
|
|
|
basket->sender = cr->pub.self;
|
|
basket->p.send.exception = exc;
|
|
basket->p.send.v = obj;
|
|
}
|
|
|
|
static void
|
|
ractor_basket_fill(rb_ractor_t *cr, struct rb_ractor_basket *basket, VALUE obj, VALUE move, bool exc)
|
|
{
|
|
VALUE v;
|
|
enum rb_ractor_basket_type type;
|
|
ractor_basket_prepare_contents(obj, move, &v, &type);
|
|
ractor_basket_fill_(cr, basket, v, exc);
|
|
basket->type.e = type;
|
|
}
|
|
|
|
static void
|
|
ractor_basket_fill_will(rb_ractor_t *cr, struct rb_ractor_basket *basket, VALUE obj, bool exc)
|
|
{
|
|
ractor_basket_fill_(cr, basket, obj, exc);
|
|
basket->type.e = basket_type_will;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_send(rb_execution_context_t *ec, rb_ractor_t *r, VALUE obj, VALUE move)
|
|
{
|
|
struct rb_ractor_basket basket;
|
|
// TODO: Ractor local GC
|
|
ractor_basket_fill(rb_ec_ractor_ptr(ec), &basket, obj, move, false);
|
|
ractor_send_basket(ec, r, &basket);
|
|
return r->pub.self;
|
|
}
|
|
|
|
// Ractor#take
|
|
|
|
static bool
|
|
ractor_take_has_will(rb_ractor_t *r)
|
|
{
|
|
ASSERT_ractor_locking(r);
|
|
|
|
return basket_type_p(&r->sync.will_basket, basket_type_will);
|
|
}
|
|
|
|
static bool
|
|
ractor_take_will(rb_ractor_t *r, struct rb_ractor_basket *b)
|
|
{
|
|
ASSERT_ractor_locking(r);
|
|
|
|
if (ractor_take_has_will(r)) {
|
|
*b = r->sync.will_basket;
|
|
r->sync.will_basket.type.e = basket_type_none;
|
|
return true;
|
|
}
|
|
else {
|
|
VM_ASSERT(basket_type_p(&r->sync.will_basket, basket_type_none));
|
|
return false;
|
|
}
|
|
}
|
|
|
|
static bool
|
|
ractor_take_will_lock(rb_ractor_t *r, struct rb_ractor_basket *b)
|
|
{
|
|
ASSERT_ractor_unlocking(r);
|
|
bool taken;
|
|
|
|
RACTOR_LOCK(r);
|
|
{
|
|
taken = ractor_take_will(r, b);
|
|
}
|
|
RACTOR_UNLOCK(r);
|
|
|
|
return taken;
|
|
}
|
|
|
|
static bool
|
|
ractor_register_take(rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *take_basket,
|
|
bool is_take, struct rb_ractor_selector_take_config *config, bool ignore_error)
|
|
{
|
|
struct rb_ractor_basket b = {
|
|
.type.e = basket_type_take_basket,
|
|
.sender = cr->pub.self,
|
|
.p = {
|
|
.take = {
|
|
.basket = take_basket,
|
|
.config = config,
|
|
},
|
|
},
|
|
};
|
|
bool closed = false;
|
|
|
|
RACTOR_LOCK(r);
|
|
{
|
|
if (is_take && ractor_take_will(r, take_basket)) {
|
|
RUBY_DEBUG_LOG("take over a will of r:%d", rb_ractor_id(r));
|
|
}
|
|
else if (!is_take && ractor_take_has_will(r)) {
|
|
RUBY_DEBUG_LOG("has_will");
|
|
VM_ASSERT(config != NULL);
|
|
config->closed = true;
|
|
}
|
|
else if (r->sync.outgoing_port_closed) {
|
|
closed = true;
|
|
}
|
|
else {
|
|
RUBY_DEBUG_LOG("register in r:%d", rb_ractor_id(r));
|
|
ractor_queue_enq(r, &r->sync.takers_queue, &b);
|
|
|
|
if (basket_none_p(take_basket)) {
|
|
ractor_wakeup(r, wait_yielding, wakeup_by_take);
|
|
}
|
|
}
|
|
}
|
|
RACTOR_UNLOCK(r);
|
|
|
|
if (closed) {
|
|
if (!ignore_error) rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
|
|
return false;
|
|
}
|
|
else {
|
|
return true;
|
|
}
|
|
}
|
|
|
|
static bool
|
|
ractor_deregister_take(rb_ractor_t *r, struct rb_ractor_basket *take_basket)
|
|
{
|
|
struct rb_ractor_queue *ts = &r->sync.takers_queue;
|
|
bool deleted = false;
|
|
|
|
RACTOR_LOCK(r);
|
|
{
|
|
if (r->sync.outgoing_port_closed) {
|
|
// ok
|
|
}
|
|
else {
|
|
for (int i=0; i<ts->cnt; i++) {
|
|
struct rb_ractor_basket *b = ractor_queue_at(r, ts, i);
|
|
if (basket_type_p(b, basket_type_take_basket) && b->p.take.basket == take_basket) {
|
|
ractor_queue_delete(r, ts, b);
|
|
deleted = true;
|
|
}
|
|
}
|
|
if (deleted) {
|
|
ractor_queue_compact(r, ts);
|
|
}
|
|
}
|
|
}
|
|
RACTOR_UNLOCK(r);
|
|
|
|
return deleted;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_try_take(rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *take_basket)
|
|
{
|
|
bool taken;
|
|
|
|
RACTOR_LOCK_SELF(cr);
|
|
{
|
|
if (basket_none_p(take_basket) || basket_type_p(take_basket, basket_type_yielding)) {
|
|
taken = false;
|
|
}
|
|
else {
|
|
taken = true;
|
|
}
|
|
}
|
|
RACTOR_UNLOCK_SELF(cr);
|
|
|
|
if (taken) {
|
|
RUBY_DEBUG_LOG("taken");
|
|
if (basket_type_p(take_basket, basket_type_deleted)) {
|
|
VM_ASSERT(r->sync.outgoing_port_closed);
|
|
rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
|
|
}
|
|
return ractor_basket_accept(take_basket);
|
|
}
|
|
else {
|
|
RUBY_DEBUG_LOG("not taken");
|
|
return Qundef;
|
|
}
|
|
}
|
|
|
|
|
|
#if VM_CHECK_MODE > 0
|
|
static bool
|
|
ractor_check_specific_take_basket_lock(rb_ractor_t *r, struct rb_ractor_basket *tb)
|
|
{
|
|
bool ret = false;
|
|
struct rb_ractor_queue *ts = &r->sync.takers_queue;
|
|
|
|
RACTOR_LOCK(r);
|
|
{
|
|
for (int i=0; i<ts->cnt; i++) {
|
|
struct rb_ractor_basket *b = ractor_queue_at(r, ts, i);
|
|
if (basket_type_p(b, basket_type_take_basket) && b->p.take.basket == tb) {
|
|
ret = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
RACTOR_UNLOCK(r);
|
|
|
|
return ret;
|
|
}
|
|
#endif
|
|
|
|
static void
|
|
ractor_take_cleanup(rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *tb)
|
|
{
|
|
retry:
|
|
if (basket_none_p(tb)) { // not yielded yet
|
|
if (!ractor_deregister_take(r, tb)) {
|
|
// not in r's takers queue
|
|
rb_thread_sleep(0);
|
|
goto retry;
|
|
}
|
|
}
|
|
else {
|
|
VM_ASSERT(!ractor_check_specific_take_basket_lock(r, tb));
|
|
}
|
|
}
|
|
|
|
struct take_wait_take_cleanup_data {
|
|
rb_ractor_t *r;
|
|
struct rb_ractor_basket *tb;
|
|
};
|
|
|
|
static void
|
|
ractor_wait_take_cleanup(rb_ractor_t *cr, void *ptr)
|
|
{
|
|
struct take_wait_take_cleanup_data *data = (struct take_wait_take_cleanup_data *)ptr;
|
|
ractor_take_cleanup(cr, data->r, data->tb);
|
|
}
|
|
|
|
static void
|
|
ractor_wait_take(rb_execution_context_t *ec, rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *take_basket)
|
|
{
|
|
struct take_wait_take_cleanup_data data = {
|
|
.r = r,
|
|
.tb = take_basket,
|
|
};
|
|
|
|
RACTOR_LOCK_SELF(cr);
|
|
{
|
|
if (basket_none_p(take_basket) || basket_type_p(take_basket, basket_type_yielding)) {
|
|
ractor_sleep_with_cleanup(ec, cr, wait_taking, ractor_wait_take_cleanup, &data);
|
|
}
|
|
}
|
|
RACTOR_UNLOCK_SELF(cr);
|
|
}
|
|
|
|
static VALUE
|
|
ractor_take(rb_execution_context_t *ec, rb_ractor_t *r)
|
|
{
|
|
RUBY_DEBUG_LOG("from r:%u", rb_ractor_id(r));
|
|
VALUE v;
|
|
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
|
|
|
|
struct rb_ractor_basket take_basket = {
|
|
.type.e = basket_type_none,
|
|
.sender = 0,
|
|
};
|
|
|
|
ractor_register_take(cr, r, &take_basket, true, NULL, false);
|
|
|
|
while (UNDEF_P(v = ractor_try_take(cr, r, &take_basket))) {
|
|
ractor_wait_take(ec, cr, r, &take_basket);
|
|
}
|
|
|
|
VM_ASSERT(!basket_none_p(&take_basket));
|
|
VM_ASSERT(!ractor_check_specific_take_basket_lock(r, &take_basket));
|
|
|
|
return v;
|
|
}
|
|
|
|
// Ractor.yield
|
|
|
|
static bool
|
|
ractor_check_take_basket(rb_ractor_t *cr, struct rb_ractor_queue *rs)
|
|
{
|
|
ASSERT_ractor_locking(cr);
|
|
|
|
for (int i=0; i<rs->cnt; i++) {
|
|
struct rb_ractor_basket *b = ractor_queue_at(cr, rs, i);
|
|
if (basket_type_p(b, basket_type_take_basket) &&
|
|
basket_none_p(b->p.take.basket)) {
|
|
return true;
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
static bool
|
|
ractor_deq_take_basket(rb_ractor_t *cr, struct rb_ractor_queue *rs, struct rb_ractor_basket *b)
|
|
{
|
|
ASSERT_ractor_unlocking(cr);
|
|
struct rb_ractor_basket *first_tb = NULL;
|
|
bool found = false;
|
|
|
|
RACTOR_LOCK_SELF(cr);
|
|
{
|
|
while (ractor_queue_deq(cr, rs, b)) {
|
|
if (basket_type_p(b, basket_type_take_basket)) {
|
|
struct rb_ractor_basket *tb = b->p.take.basket;
|
|
|
|
if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_yielding) == basket_type_none) {
|
|
found = true;
|
|
break;
|
|
}
|
|
else {
|
|
ractor_queue_enq(cr, rs, b);
|
|
if (first_tb == NULL) first_tb = tb;
|
|
struct rb_ractor_basket *head = ractor_queue_head(cr, rs);
|
|
VM_ASSERT(head != NULL);
|
|
if (basket_type_p(head, basket_type_take_basket) && head->p.take.basket == first_tb) {
|
|
break; // loop detected
|
|
}
|
|
}
|
|
}
|
|
else {
|
|
VM_ASSERT(basket_none_p(b));
|
|
}
|
|
}
|
|
|
|
if (found && b->p.take.config && !b->p.take.config->oneshot) {
|
|
ractor_queue_enq(cr, rs, b);
|
|
}
|
|
}
|
|
RACTOR_UNLOCK_SELF(cr);
|
|
|
|
return found;
|
|
}
|
|
|
|
static bool
|
|
ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *ts, volatile VALUE obj, VALUE move, bool exc, bool is_will)
|
|
{
|
|
ASSERT_ractor_unlocking(cr);
|
|
|
|
struct rb_ractor_basket b;
|
|
|
|
if (ractor_deq_take_basket(cr, ts, &b)) {
|
|
VM_ASSERT(basket_type_p(&b, basket_type_take_basket));
|
|
VM_ASSERT(basket_type_p(b.p.take.basket, basket_type_yielding));
|
|
|
|
rb_ractor_t *tr = RACTOR_PTR(b.sender);
|
|
struct rb_ractor_basket *tb = b.p.take.basket;
|
|
enum rb_ractor_basket_type type;
|
|
|
|
RUBY_DEBUG_LOG("basket from r:%u", rb_ractor_id(tr));
|
|
|
|
if (is_will) {
|
|
type = basket_type_will;
|
|
}
|
|
else {
|
|
int state;
|
|
|
|
// begin
|
|
EC_PUSH_TAG(ec);
|
|
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
|
|
// TODO: Ractor local GC
|
|
ractor_basket_prepare_contents(obj, move, &obj, &type);
|
|
}
|
|
EC_POP_TAG();
|
|
// rescue
|
|
if (state) {
|
|
RACTOR_LOCK_SELF(cr);
|
|
{
|
|
b.p.take.basket->type.e = basket_type_none;
|
|
ractor_queue_enq(cr, ts, &b);
|
|
}
|
|
RACTOR_UNLOCK_SELF(cr);
|
|
EC_JUMP_TAG(ec, state);
|
|
}
|
|
}
|
|
|
|
RACTOR_LOCK(tr);
|
|
{
|
|
VM_ASSERT(basket_type_p(tb, basket_type_yielding));
|
|
// fill atomic
|
|
RUBY_DEBUG_LOG("fill %sbasket from r:%u", is_will ? "will " : "", rb_ractor_id(tr));
|
|
ractor_basket_fill_(cr, tb, obj, exc);
|
|
if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_yielding, type) != basket_type_yielding) {
|
|
rb_bug("unreachable");
|
|
}
|
|
ractor_wakeup(tr, wait_taking, wakeup_by_yield);
|
|
}
|
|
RACTOR_UNLOCK(tr);
|
|
|
|
return true;
|
|
}
|
|
else {
|
|
RUBY_DEBUG_LOG("no take basket");
|
|
return false;
|
|
}
|
|
}
|
|
|
|
static void
|
|
ractor_wait_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *ts)
|
|
{
|
|
RACTOR_LOCK_SELF(cr);
|
|
{
|
|
while (!ractor_check_take_basket(cr, ts)) {
|
|
ractor_sleep(ec, cr, wait_yielding);
|
|
}
|
|
}
|
|
RACTOR_UNLOCK_SELF(cr);
|
|
}
|
|
|
|
static VALUE
|
|
ractor_yield(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE obj, VALUE move)
|
|
{
|
|
struct rb_ractor_queue *ts = &cr->sync.takers_queue;
|
|
|
|
while (!ractor_try_yield(ec, cr, ts, obj, move, false, false)) {
|
|
ractor_wait_yield(ec, cr, ts);
|
|
}
|
|
|
|
return Qnil;
|
|
}
|
|
|
|
// Ractor::Selector
|
|
|
|
struct rb_ractor_selector {
|
|
rb_ractor_t *r;
|
|
struct rb_ractor_basket take_basket;
|
|
st_table *take_ractors; // rb_ractor_t * => (struct rb_ractor_selector_take_config *)
|
|
};
|
|
|
|
static int
|
|
ractor_selector_mark_ractors_i(st_data_t key, st_data_t value, st_data_t data)
|
|
{
|
|
const rb_ractor_t *r = (rb_ractor_t *)key;
|
|
rb_gc_mark(r->pub.self);
|
|
return ST_CONTINUE;
|
|
}
|
|
|
|
static void
|
|
ractor_selector_mark(void *ptr)
|
|
{
|
|
struct rb_ractor_selector *s = ptr;
|
|
|
|
if (s->take_ractors) {
|
|
st_foreach(s->take_ractors, ractor_selector_mark_ractors_i, 0);
|
|
}
|
|
|
|
switch (s->take_basket.type.e) {
|
|
case basket_type_ref:
|
|
case basket_type_copy:
|
|
case basket_type_move:
|
|
case basket_type_will:
|
|
rb_gc_mark(s->take_basket.sender);
|
|
rb_gc_mark(s->take_basket.p.send.v);
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
|
|
static int
|
|
ractor_selector_release_i(st_data_t key, st_data_t val, st_data_t data)
|
|
{
|
|
struct rb_ractor_selector *s = (struct rb_ractor_selector *)data;
|
|
struct rb_ractor_selector_take_config *config = (struct rb_ractor_selector_take_config *)val;
|
|
|
|
if (!config->closed) {
|
|
ractor_deregister_take((rb_ractor_t *)key, &s->take_basket);
|
|
}
|
|
free(config);
|
|
return ST_CONTINUE;
|
|
}
|
|
|
|
static void
|
|
ractor_selector_free(void *ptr)
|
|
{
|
|
struct rb_ractor_selector *s = ptr;
|
|
st_foreach(s->take_ractors, ractor_selector_release_i, (st_data_t)s);
|
|
st_free_table(s->take_ractors);
|
|
ruby_xfree(ptr);
|
|
}
|
|
|
|
static size_t
|
|
ractor_selector_memsize(const void *ptr)
|
|
{
|
|
const struct rb_ractor_selector *s = ptr;
|
|
return sizeof(struct rb_ractor_selector) +
|
|
st_memsize(s->take_ractors) +
|
|
s->take_ractors->num_entries * sizeof(struct rb_ractor_selector_take_config);
|
|
}
|
|
|
|
static const rb_data_type_t ractor_selector_data_type = {
|
|
"ractor/selector",
|
|
{
|
|
ractor_selector_mark,
|
|
ractor_selector_free,
|
|
ractor_selector_memsize,
|
|
NULL, // update
|
|
},
|
|
0, 0, RUBY_TYPED_FREE_IMMEDIATELY,
|
|
};
|
|
|
|
static struct rb_ractor_selector *
|
|
RACTOR_SELECTOR_PTR(VALUE selv)
|
|
{
|
|
VM_ASSERT(rb_typeddata_is_kind_of(selv, &ractor_selector_data_type));
|
|
|
|
return (struct rb_ractor_selector *)DATA_PTR(selv);
|
|
}
|
|
|
|
// Ractor::Selector.new
|
|
|
|
static VALUE
|
|
ractor_selector_create(VALUE klass)
|
|
{
|
|
struct rb_ractor_selector *s;
|
|
VALUE selv = TypedData_Make_Struct(klass, struct rb_ractor_selector, &ractor_selector_data_type, s);
|
|
s->take_basket.type.e = basket_type_reserved;
|
|
s->take_ractors = st_init_numtable(); // ractor (ptr) -> take_config
|
|
return selv;
|
|
}
|
|
|
|
// Ractor::Selector#add(r)
|
|
|
|
static VALUE
|
|
ractor_selector_add(VALUE selv, VALUE rv)
|
|
{
|
|
if (!rb_ractor_p(rv)) {
|
|
rb_raise(rb_eArgError, "Not a ractor object");
|
|
}
|
|
|
|
rb_ractor_t *r = RACTOR_PTR(rv);
|
|
struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
|
|
|
|
if (st_lookup(s->take_ractors, (st_data_t)r, NULL)) {
|
|
rb_raise(rb_eArgError, "already added");
|
|
}
|
|
|
|
struct rb_ractor_selector_take_config *config = malloc(sizeof(struct rb_ractor_selector_take_config));
|
|
VM_ASSERT(config != NULL);
|
|
config->closed = false;
|
|
config->oneshot = false;
|
|
|
|
if (ractor_register_take(GET_RACTOR(), r, &s->take_basket, false, config, true)) {
|
|
st_insert(s->take_ractors, (st_data_t)r, (st_data_t)config);
|
|
}
|
|
|
|
return rv;
|
|
}
|
|
|
|
// Ractor::Selector#remove(r)
|
|
|
|
static VALUE
|
|
ractor_selector_remove(VALUE selv, VALUE rv)
|
|
{
|
|
if (!rb_ractor_p(rv)) {
|
|
rb_raise(rb_eArgError, "Not a ractor object");
|
|
}
|
|
|
|
rb_ractor_t *r = RACTOR_PTR(rv);
|
|
struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
|
|
|
|
RUBY_DEBUG_LOG("r:%u", rb_ractor_id(r));
|
|
|
|
if (!st_lookup(s->take_ractors, (st_data_t)r, NULL)) {
|
|
rb_raise(rb_eArgError, "not added yet");
|
|
}
|
|
|
|
ractor_deregister_take(r, &s->take_basket);
|
|
struct rb_ractor_selector_take_config *config;
|
|
st_delete(s->take_ractors, (st_data_t *)&r, (st_data_t *)&config);
|
|
free(config);
|
|
|
|
return rv;
|
|
}
|
|
|
|
// Ractor::Selector#clear
|
|
|
|
struct ractor_selector_clear_data {
|
|
VALUE selv;
|
|
rb_execution_context_t *ec;
|
|
};
|
|
|
|
static int
|
|
ractor_selector_clear_i(st_data_t key, st_data_t val, st_data_t data)
|
|
{
|
|
VALUE selv = (VALUE)data;
|
|
rb_ractor_t *r = (rb_ractor_t *)key;
|
|
ractor_selector_remove(selv, r->pub.self);
|
|
return ST_CONTINUE;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_selector_clear(VALUE selv)
|
|
{
|
|
struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
|
|
|
|
st_foreach(s->take_ractors, ractor_selector_clear_i, (st_data_t)selv);
|
|
st_clear(s->take_ractors);
|
|
return selv;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_selector_empty_p(VALUE selv)
|
|
{
|
|
struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
|
|
return s->take_ractors->num_entries == 0 ? Qtrue : Qfalse;
|
|
}
|
|
|
|
static int
|
|
ractor_selector_wait_i(st_data_t key, st_data_t val, st_data_t dat)
|
|
{
|
|
rb_ractor_t *r = (rb_ractor_t *)key;
|
|
struct rb_ractor_basket *tb = (struct rb_ractor_basket *)dat;
|
|
int ret;
|
|
|
|
if (!basket_none_p(tb)) {
|
|
RUBY_DEBUG_LOG("already taken:%s", basket_type_name(tb->type.e));
|
|
return ST_STOP;
|
|
}
|
|
|
|
RACTOR_LOCK(r);
|
|
{
|
|
if (basket_type_p(&r->sync.will_basket, basket_type_will)) {
|
|
RUBY_DEBUG_LOG("r:%u has will", rb_ractor_id(r));
|
|
|
|
if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_will) == basket_type_none) {
|
|
ractor_take_will(r, tb);
|
|
ret = ST_STOP;
|
|
}
|
|
else {
|
|
RUBY_DEBUG_LOG("has will, but already taken (%s)", basket_type_name(tb->type.e));
|
|
ret = ST_CONTINUE;
|
|
}
|
|
}
|
|
else if (r->sync.outgoing_port_closed) {
|
|
RUBY_DEBUG_LOG("r:%u is closed", rb_ractor_id(r));
|
|
|
|
if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_deleted) == basket_type_none) {
|
|
tb->sender = r->pub.self;
|
|
ret = ST_STOP;
|
|
}
|
|
else {
|
|
RUBY_DEBUG_LOG("closed, but already taken (%s)", basket_type_name(tb->type.e));
|
|
ret = ST_CONTINUE;
|
|
}
|
|
}
|
|
else {
|
|
RUBY_DEBUG_LOG("wakeup r:%u", rb_ractor_id(r));
|
|
ractor_wakeup(r, wait_yielding, wakeup_by_take);
|
|
ret = ST_CONTINUE;
|
|
}
|
|
}
|
|
RACTOR_UNLOCK(r);
|
|
|
|
return ret;
|
|
}
|
|
|
|
// Ractor::Selector#wait
|
|
|
|
static void
|
|
ractor_selector_wait_cleaup(rb_ractor_t *cr, void *ptr)
|
|
{
|
|
struct rb_ractor_basket *tb = (struct rb_ractor_basket *)ptr;
|
|
|
|
RACTOR_LOCK_SELF(cr);
|
|
{
|
|
while (basket_type_p(tb, basket_type_yielding)) rb_thread_sleep(0);
|
|
// if tb->type is not none, taking is succeeded, but interruption ignore it unfortunately.
|
|
tb->type.e = basket_type_reserved;
|
|
}
|
|
RACTOR_UNLOCK_SELF(cr);
|
|
}
|
|
|
|
static VALUE
|
|
ractor_selector__wait(VALUE selv, VALUE do_receivev, VALUE do_yieldv, VALUE yield_value, VALUE move)
|
|
{
|
|
rb_execution_context_t *ec = GET_EC();
|
|
struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
|
|
struct rb_ractor_basket *tb = &s->take_basket;
|
|
struct rb_ractor_basket taken_basket;
|
|
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
|
|
bool do_receive = !!RTEST(do_receivev);
|
|
bool do_yield = !!RTEST(do_yieldv);
|
|
VALUE ret_v, ret_r;
|
|
enum rb_ractor_wait_status wait_status;
|
|
struct rb_ractor_queue *rq = &cr->sync.recv_queue;
|
|
struct rb_ractor_queue *ts = &cr->sync.takers_queue;
|
|
|
|
RUBY_DEBUG_LOG("start");
|
|
|
|
retry:
|
|
RUBY_DEBUG_LOG("takers:%ld", s->take_ractors->num_entries);
|
|
|
|
// setup wait_status
|
|
wait_status = wait_none;
|
|
if (s->take_ractors->num_entries > 0) wait_status |= wait_taking;
|
|
if (do_receive) wait_status |= wait_receiving;
|
|
if (do_yield) wait_status |= wait_yielding;
|
|
|
|
RUBY_DEBUG_LOG("wait:%s", wait_status_str(wait_status));
|
|
|
|
if (wait_status == wait_none) {
|
|
rb_raise(rb_eRactorError, "no taking ractors");
|
|
}
|
|
|
|
// check recv_queue
|
|
if (do_receive && (ret_v = ractor_try_receive(ec, cr, rq)) != Qundef) {
|
|
ret_r = ID2SYM(rb_intern("receive"));
|
|
goto success;
|
|
}
|
|
|
|
// check takers
|
|
if (do_yield && ractor_try_yield(ec, cr, ts, yield_value, move, false, false)) {
|
|
ret_v = Qnil;
|
|
ret_r = ID2SYM(rb_intern("yield"));
|
|
goto success;
|
|
}
|
|
|
|
// check take_basket
|
|
VM_ASSERT(basket_type_p(&s->take_basket, basket_type_reserved));
|
|
s->take_basket.type.e = basket_type_none;
|
|
// kick all take target ractors
|
|
st_foreach(s->take_ractors, ractor_selector_wait_i, (st_data_t)tb);
|
|
|
|
RACTOR_LOCK_SELF(cr);
|
|
{
|
|
retry_waiting:
|
|
while (1) {
|
|
if (!basket_none_p(tb)) {
|
|
RUBY_DEBUG_LOG("taken:%s from r:%u", basket_type_name(tb->type.e),
|
|
tb->sender ? rb_ractor_id(RACTOR_PTR(tb->sender)) : 0);
|
|
break;
|
|
}
|
|
if (do_receive && !ractor_queue_empty_p(cr, rq)) {
|
|
RUBY_DEBUG_LOG("can receive (%d)", rq->cnt);
|
|
break;
|
|
}
|
|
if (do_yield && ractor_check_take_basket(cr, ts)) {
|
|
RUBY_DEBUG_LOG("can yield");
|
|
break;
|
|
}
|
|
|
|
ractor_sleep_with_cleanup(ec, cr, wait_status, ractor_selector_wait_cleaup, tb);
|
|
}
|
|
|
|
taken_basket = *tb;
|
|
|
|
// ensure
|
|
// tb->type.e = basket_type_reserved # do it atomic in the following code
|
|
if (taken_basket.type.e == basket_type_yielding ||
|
|
RUBY_ATOMIC_CAS(tb->type.atomic, taken_basket.type.e, basket_type_reserved) != taken_basket.type.e) {
|
|
|
|
if (basket_type_p(tb, basket_type_yielding)) {
|
|
RACTOR_UNLOCK_SELF(cr);
|
|
{
|
|
rb_thread_sleep(0);
|
|
}
|
|
RACTOR_LOCK_SELF(cr);
|
|
}
|
|
goto retry_waiting;
|
|
}
|
|
}
|
|
RACTOR_UNLOCK_SELF(cr);
|
|
|
|
// check the taken resutl
|
|
switch (taken_basket.type.e) {
|
|
case basket_type_none:
|
|
VM_ASSERT(do_receive || do_yield);
|
|
goto retry;
|
|
case basket_type_yielding:
|
|
rb_bug("unreachable");
|
|
case basket_type_deleted: {
|
|
ractor_selector_remove(selv, taken_basket.sender);
|
|
|
|
rb_ractor_t *r = RACTOR_PTR(taken_basket.sender);
|
|
if (ractor_take_will_lock(r, &taken_basket)) {
|
|
RUBY_DEBUG_LOG("has_will");
|
|
}
|
|
else {
|
|
RUBY_DEBUG_LOG("no will");
|
|
// rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
|
|
// remove and retry wait
|
|
goto retry;
|
|
}
|
|
break;
|
|
}
|
|
case basket_type_will:
|
|
// no more messages
|
|
ractor_selector_remove(selv, taken_basket.sender);
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
RUBY_DEBUG_LOG("taken_basket:%s", basket_type_name(taken_basket.type.e));
|
|
|
|
ret_v = ractor_basket_accept(&taken_basket);
|
|
ret_r = taken_basket.sender;
|
|
success:
|
|
return rb_ary_new_from_args(2, ret_r, ret_v);
|
|
}
|
|
|
|
static VALUE
|
|
ractor_selector_wait(int argc, VALUE *argv, VALUE selector)
|
|
{
|
|
VALUE options;
|
|
ID keywords[3];
|
|
VALUE values[3];
|
|
|
|
keywords[0] = rb_intern("receive");
|
|
keywords[1] = rb_intern("yield_value");
|
|
keywords[2] = rb_intern("move");
|
|
|
|
rb_scan_args(argc, argv, "0:", &options);
|
|
rb_get_kwargs(options, keywords, 0, numberof(values), values);
|
|
return ractor_selector__wait(selector,
|
|
values[0] == Qundef ? Qfalse : RTEST(values[0]),
|
|
values[1] != Qundef, values[1], values[2]);
|
|
}
|
|
|
|
static VALUE
|
|
ractor_selector_new(int argc, VALUE *ractors, VALUE klass)
|
|
{
|
|
VALUE selector = ractor_selector_create(klass);
|
|
|
|
for (int i=0; i<argc; i++) {
|
|
ractor_selector_add(selector, ractors[i]);
|
|
}
|
|
|
|
return selector;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_select_internal(rb_execution_context_t *ec, VALUE self, VALUE ractors, VALUE do_receive, VALUE do_yield, VALUE yield_value, VALUE move)
|
|
{
|
|
VALUE selector = ractor_selector_new(RARRAY_LENINT(ractors), (VALUE *)RARRAY_CONST_PTR(ractors), rb_cRactorSelector);
|
|
VALUE result;
|
|
int state;
|
|
|
|
EC_PUSH_TAG(ec);
|
|
if ((state = EC_EXEC_TAG() == TAG_NONE)) {
|
|
result = ractor_selector__wait(selector, do_receive, do_yield, yield_value, move);
|
|
}
|
|
else {
|
|
// ensure
|
|
ractor_selector_clear(selector);
|
|
|
|
// jump
|
|
EC_JUMP_TAG(ec, state);
|
|
}
|
|
EC_POP_TAG();
|
|
|
|
RB_GC_GUARD(ractors);
|
|
return result;
|
|
}
|
|
|
|
// Ractor#close_incoming
|
|
|
|
static VALUE
|
|
ractor_close_incoming(rb_execution_context_t *ec, rb_ractor_t *r)
|
|
{
|
|
VALUE prev;
|
|
|
|
RACTOR_LOCK(r);
|
|
{
|
|
if (!r->sync.incoming_port_closed) {
|
|
prev = Qfalse;
|
|
r->sync.incoming_port_closed = true;
|
|
if (ractor_wakeup(r, wait_receiving, wakeup_by_close)) {
|
|
VM_ASSERT(ractor_queue_empty_p(r, &r->sync.recv_queue));
|
|
RUBY_DEBUG_LOG("cancel receiving");
|
|
}
|
|
}
|
|
else {
|
|
prev = Qtrue;
|
|
}
|
|
}
|
|
RACTOR_UNLOCK(r);
|
|
return prev;
|
|
}
|
|
|
|
// Ractor#close_outgoing
|
|
|
|
static VALUE
|
|
ractor_close_outgoing(rb_execution_context_t *ec, rb_ractor_t *r)
|
|
{
|
|
VALUE prev;
|
|
|
|
RACTOR_LOCK(r);
|
|
{
|
|
struct rb_ractor_queue *ts = &r->sync.takers_queue;
|
|
rb_ractor_t *tr;
|
|
struct rb_ractor_basket b;
|
|
|
|
if (!r->sync.outgoing_port_closed) {
|
|
prev = Qfalse;
|
|
r->sync.outgoing_port_closed = true;
|
|
}
|
|
else {
|
|
VM_ASSERT(ractor_queue_empty_p(r, ts));
|
|
prev = Qtrue;
|
|
}
|
|
|
|
// wakeup all taking ractors
|
|
while (ractor_queue_deq(r, ts, &b)) {
|
|
if (basket_type_p(&b, basket_type_take_basket)) {
|
|
tr = RACTOR_PTR(b.sender);
|
|
struct rb_ractor_basket *tb = b.p.take.basket;
|
|
|
|
if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_yielding) == basket_type_none) {
|
|
b.p.take.basket->sender = r->pub.self;
|
|
if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_yielding, basket_type_deleted) != basket_type_yielding) {
|
|
rb_bug("unreachable");
|
|
}
|
|
RUBY_DEBUG_LOG("set delete for r:%u", rb_ractor_id(RACTOR_PTR(b.sender)));
|
|
}
|
|
|
|
if (b.p.take.config) {
|
|
b.p.take.config->closed = true;
|
|
}
|
|
|
|
// TODO: deadlock-able?
|
|
RACTOR_LOCK(tr);
|
|
{
|
|
ractor_wakeup(tr, wait_taking, wakeup_by_close);
|
|
}
|
|
RACTOR_UNLOCK(tr);
|
|
}
|
|
}
|
|
|
|
// raising yielding Ractor
|
|
ractor_wakeup(r, wait_yielding, wakeup_by_close);
|
|
|
|
VM_ASSERT(ractor_queue_empty_p(r, ts));
|
|
}
|
|
RACTOR_UNLOCK(r);
|
|
return prev;
|
|
}
|
|
|
|
// creation/termination
|
|
|
|
static uint32_t
|
|
ractor_next_id(void)
|
|
{
|
|
uint32_t id;
|
|
|
|
id = (uint32_t)(RUBY_ATOMIC_FETCH_ADD(ractor_last_id, 1) + 1);
|
|
|
|
return id;
|
|
}
|
|
|
|
static void
|
|
vm_insert_ractor0(rb_vm_t *vm, rb_ractor_t *r, bool single_ractor_mode)
|
|
{
|
|
RUBY_DEBUG_LOG("r:%u ractor.cnt:%u++", r->pub.id, vm->ractor.cnt);
|
|
VM_ASSERT(single_ractor_mode || RB_VM_LOCKED_P());
|
|
|
|
ccan_list_add_tail(&vm->ractor.set, &r->vmlr_node);
|
|
vm->ractor.cnt++;
|
|
}
|
|
|
|
static void
|
|
cancel_single_ractor_mode(void)
|
|
{
|
|
// enable multi-ractor mode
|
|
RUBY_DEBUG_LOG("enable multi-ractor mode");
|
|
|
|
VALUE was_disabled = rb_gc_enable();
|
|
|
|
rb_gc_start();
|
|
|
|
if (was_disabled) {
|
|
rb_gc_disable();
|
|
}
|
|
|
|
ruby_single_main_ractor = NULL;
|
|
}
|
|
|
|
static void
|
|
vm_insert_ractor(rb_vm_t *vm, rb_ractor_t *r)
|
|
{
|
|
VM_ASSERT(ractor_status_p(r, ractor_created));
|
|
|
|
if (rb_multi_ractor_p()) {
|
|
RB_VM_LOCK();
|
|
{
|
|
vm_insert_ractor0(vm, r, false);
|
|
vm_ractor_blocking_cnt_inc(vm, r, __FILE__, __LINE__);
|
|
}
|
|
RB_VM_UNLOCK();
|
|
}
|
|
else {
|
|
if (vm->ractor.cnt == 0) {
|
|
// main ractor
|
|
vm_insert_ractor0(vm, r, true);
|
|
ractor_status_set(r, ractor_blocking);
|
|
ractor_status_set(r, ractor_running);
|
|
}
|
|
else {
|
|
cancel_single_ractor_mode();
|
|
vm_insert_ractor0(vm, r, true);
|
|
vm_ractor_blocking_cnt_inc(vm, r, __FILE__, __LINE__);
|
|
}
|
|
}
|
|
}
|
|
|
|
static void
|
|
vm_remove_ractor(rb_vm_t *vm, rb_ractor_t *cr)
|
|
{
|
|
VM_ASSERT(ractor_status_p(cr, ractor_running));
|
|
VM_ASSERT(vm->ractor.cnt > 1);
|
|
VM_ASSERT(cr->threads.cnt == 1);
|
|
|
|
RB_VM_LOCK();
|
|
{
|
|
RUBY_DEBUG_LOG("ractor.cnt:%u-- terminate_waiting:%d",
|
|
vm->ractor.cnt, vm->ractor.sync.terminate_waiting);
|
|
|
|
VM_ASSERT(vm->ractor.cnt > 0);
|
|
ccan_list_del(&cr->vmlr_node);
|
|
|
|
if (vm->ractor.cnt <= 2 && vm->ractor.sync.terminate_waiting) {
|
|
rb_native_cond_signal(&vm->ractor.sync.terminate_cond);
|
|
}
|
|
vm->ractor.cnt--;
|
|
|
|
/* Clear the cached freelist to prevent a memory leak. */
|
|
rb_gc_ractor_newobj_cache_clear(&cr->newobj_cache);
|
|
|
|
ractor_status_set(cr, ractor_terminated);
|
|
}
|
|
RB_VM_UNLOCK();
|
|
}
|
|
|
|
static VALUE
|
|
ractor_alloc(VALUE klass)
|
|
{
|
|
rb_ractor_t *r;
|
|
VALUE rv = TypedData_Make_Struct(klass, rb_ractor_t, &ractor_data_type, r);
|
|
FL_SET_RAW(rv, RUBY_FL_SHAREABLE);
|
|
r->pub.self = rv;
|
|
VM_ASSERT(ractor_status_p(r, ractor_created));
|
|
return rv;
|
|
}
|
|
|
|
rb_ractor_t *
|
|
rb_ractor_main_alloc(void)
|
|
{
|
|
rb_ractor_t *r = ruby_mimmalloc(sizeof(rb_ractor_t));
|
|
if (r == NULL) {
|
|
fprintf(stderr, "[FATAL] failed to allocate memory for main ractor\n");
|
|
exit(EXIT_FAILURE);
|
|
}
|
|
MEMZERO(r, rb_ractor_t, 1);
|
|
r->pub.id = ++ractor_last_id;
|
|
r->loc = Qnil;
|
|
r->name = Qnil;
|
|
r->pub.self = Qnil;
|
|
ruby_single_main_ractor = r;
|
|
|
|
return r;
|
|
}
|
|
|
|
#if defined(HAVE_WORKING_FORK)
|
|
void
|
|
rb_ractor_atfork(rb_vm_t *vm, rb_thread_t *th)
|
|
{
|
|
// initialize as a main ractor
|
|
vm->ractor.cnt = 0;
|
|
vm->ractor.blocking_cnt = 0;
|
|
ruby_single_main_ractor = th->ractor;
|
|
th->ractor->status_ = ractor_created;
|
|
|
|
rb_ractor_living_threads_init(th->ractor);
|
|
rb_ractor_living_threads_insert(th->ractor, th);
|
|
|
|
VM_ASSERT(vm->ractor.blocking_cnt == 0);
|
|
VM_ASSERT(vm->ractor.cnt == 1);
|
|
}
|
|
#endif
|
|
|
|
void rb_thread_sched_init(struct rb_thread_sched *, bool atfork);
|
|
|
|
void
|
|
rb_ractor_living_threads_init(rb_ractor_t *r)
|
|
{
|
|
ccan_list_head_init(&r->threads.set);
|
|
r->threads.cnt = 0;
|
|
r->threads.blocking_cnt = 0;
|
|
}
|
|
|
|
static void
|
|
ractor_init(rb_ractor_t *r, VALUE name, VALUE loc)
|
|
{
|
|
ractor_queue_setup(&r->sync.recv_queue);
|
|
ractor_queue_setup(&r->sync.takers_queue);
|
|
rb_native_mutex_initialize(&r->sync.lock);
|
|
rb_native_cond_initialize(&r->barrier_wait_cond);
|
|
|
|
#ifdef RUBY_THREAD_WIN32_H
|
|
rb_native_cond_initialize(&r->sync.cond);
|
|
rb_native_cond_initialize(&r->barrier_wait_cond);
|
|
#endif
|
|
|
|
// thread management
|
|
rb_thread_sched_init(&r->threads.sched, false);
|
|
rb_ractor_living_threads_init(r);
|
|
|
|
// naming
|
|
if (!NIL_P(name)) {
|
|
rb_encoding *enc;
|
|
StringValueCStr(name);
|
|
enc = rb_enc_get(name);
|
|
if (!rb_enc_asciicompat(enc)) {
|
|
rb_raise(rb_eArgError, "ASCII incompatible encoding (%s)",
|
|
rb_enc_name(enc));
|
|
}
|
|
name = rb_str_new_frozen(name);
|
|
}
|
|
r->name = name;
|
|
r->loc = loc;
|
|
}
|
|
|
|
void
|
|
rb_ractor_main_setup(rb_vm_t *vm, rb_ractor_t *r, rb_thread_t *th)
|
|
{
|
|
r->pub.self = TypedData_Wrap_Struct(rb_cRactor, &ractor_data_type, r);
|
|
FL_SET_RAW(r->pub.self, RUBY_FL_SHAREABLE);
|
|
ractor_init(r, Qnil, Qnil);
|
|
r->threads.main = th;
|
|
rb_ractor_living_threads_insert(r, th);
|
|
}
|
|
|
|
static VALUE
|
|
ractor_create(rb_execution_context_t *ec, VALUE self, VALUE loc, VALUE name, VALUE args, VALUE block)
|
|
{
|
|
VALUE rv = ractor_alloc(self);
|
|
rb_ractor_t *r = RACTOR_PTR(rv);
|
|
ractor_init(r, name, loc);
|
|
|
|
// can block here
|
|
r->pub.id = ractor_next_id();
|
|
RUBY_DEBUG_LOG("r:%u", r->pub.id);
|
|
|
|
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
|
|
r->verbose = cr->verbose;
|
|
r->debug = cr->debug;
|
|
|
|
rb_yjit_before_ractor_spawn();
|
|
rb_rjit_before_ractor_spawn();
|
|
rb_thread_create_ractor(r, args, block);
|
|
|
|
RB_GC_GUARD(rv);
|
|
return rv;
|
|
}
|
|
|
|
static void
|
|
ractor_yield_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE v, bool exc)
|
|
{
|
|
if (cr->sync.outgoing_port_closed) {
|
|
return;
|
|
}
|
|
|
|
ASSERT_ractor_unlocking(cr);
|
|
|
|
struct rb_ractor_queue *ts = &cr->sync.takers_queue;
|
|
|
|
retry:
|
|
if (ractor_try_yield(ec, cr, ts, v, Qfalse, exc, true)) {
|
|
// OK.
|
|
}
|
|
else {
|
|
bool retry = false;
|
|
RACTOR_LOCK(cr);
|
|
{
|
|
if (!ractor_check_take_basket(cr, ts)) {
|
|
VM_ASSERT(cr->sync.wait.status == wait_none);
|
|
RUBY_DEBUG_LOG("leave a will");
|
|
ractor_basket_fill_will(cr, &cr->sync.will_basket, v, exc);
|
|
}
|
|
else {
|
|
RUBY_DEBUG_LOG("rare timing!");
|
|
retry = true; // another ractor is waiting for the yield.
|
|
}
|
|
}
|
|
RACTOR_UNLOCK(cr);
|
|
|
|
if (retry) goto retry;
|
|
}
|
|
}
|
|
|
|
void
|
|
rb_ractor_atexit(rb_execution_context_t *ec, VALUE result)
|
|
{
|
|
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
|
|
ractor_yield_atexit(ec, cr, result, false);
|
|
}
|
|
|
|
void
|
|
rb_ractor_atexit_exception(rb_execution_context_t *ec)
|
|
{
|
|
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
|
|
ractor_yield_atexit(ec, cr, ec->errinfo, true);
|
|
}
|
|
|
|
void
|
|
rb_ractor_teardown(rb_execution_context_t *ec)
|
|
{
|
|
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
|
|
ractor_close_incoming(ec, cr);
|
|
ractor_close_outgoing(ec, cr);
|
|
|
|
// sync with rb_ractor_terminate_interrupt_main_thread()
|
|
RB_VM_LOCK_ENTER();
|
|
{
|
|
VM_ASSERT(cr->threads.main != NULL);
|
|
cr->threads.main = NULL;
|
|
}
|
|
RB_VM_LOCK_LEAVE();
|
|
}
|
|
|
|
void
|
|
rb_ractor_receive_parameters(rb_execution_context_t *ec, rb_ractor_t *r, int len, VALUE *ptr)
|
|
{
|
|
for (int i=0; i<len; i++) {
|
|
ptr[i] = ractor_receive(ec, r);
|
|
}
|
|
}
|
|
|
|
void
|
|
rb_ractor_send_parameters(rb_execution_context_t *ec, rb_ractor_t *r, VALUE args)
|
|
{
|
|
int len = RARRAY_LENINT(args);
|
|
for (int i=0; i<len; i++) {
|
|
ractor_send(ec, r, RARRAY_AREF(args, i), false);
|
|
}
|
|
}
|
|
|
|
bool
|
|
rb_ractor_main_p_(void)
|
|
{
|
|
VM_ASSERT(rb_multi_ractor_p());
|
|
rb_execution_context_t *ec = GET_EC();
|
|
return rb_ec_ractor_ptr(ec) == rb_ec_vm_ptr(ec)->ractor.main_ractor;
|
|
}
|
|
|
|
bool
|
|
rb_obj_is_main_ractor(VALUE gv)
|
|
{
|
|
if (!rb_ractor_p(gv)) return false;
|
|
rb_ractor_t *r = DATA_PTR(gv);
|
|
return r == GET_VM()->ractor.main_ractor;
|
|
}
|
|
|
|
int
|
|
rb_ractor_living_thread_num(const rb_ractor_t *r)
|
|
{
|
|
return r->threads.cnt;
|
|
}
|
|
|
|
// only for current ractor
|
|
VALUE
|
|
rb_ractor_thread_list(void)
|
|
{
|
|
rb_ractor_t *r = GET_RACTOR();
|
|
rb_thread_t *th = 0;
|
|
VALUE ary = rb_ary_new();
|
|
|
|
ccan_list_for_each(&r->threads.set, th, lt_node) {
|
|
switch (th->status) {
|
|
case THREAD_RUNNABLE:
|
|
case THREAD_STOPPED:
|
|
case THREAD_STOPPED_FOREVER:
|
|
rb_ary_push(ary, th->self);
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
|
|
return ary;
|
|
}
|
|
|
|
void
|
|
rb_ractor_living_threads_insert(rb_ractor_t *r, rb_thread_t *th)
|
|
{
|
|
VM_ASSERT(th != NULL);
|
|
|
|
RACTOR_LOCK(r);
|
|
{
|
|
RUBY_DEBUG_LOG("r(%d)->threads.cnt:%d++", r->pub.id, r->threads.cnt);
|
|
ccan_list_add_tail(&r->threads.set, &th->lt_node);
|
|
r->threads.cnt++;
|
|
}
|
|
RACTOR_UNLOCK(r);
|
|
|
|
// first thread for a ractor
|
|
if (r->threads.cnt == 1) {
|
|
VM_ASSERT(ractor_status_p(r, ractor_created));
|
|
vm_insert_ractor(th->vm, r);
|
|
}
|
|
}
|
|
|
|
static void
|
|
vm_ractor_blocking_cnt_inc(rb_vm_t *vm, rb_ractor_t *r, const char *file, int line)
|
|
{
|
|
ractor_status_set(r, ractor_blocking);
|
|
|
|
RUBY_DEBUG_LOG2(file, line, "vm->ractor.blocking_cnt:%d++", vm->ractor.blocking_cnt);
|
|
vm->ractor.blocking_cnt++;
|
|
VM_ASSERT(vm->ractor.blocking_cnt <= vm->ractor.cnt);
|
|
}
|
|
|
|
void
|
|
rb_vm_ractor_blocking_cnt_inc(rb_vm_t *vm, rb_ractor_t *cr, const char *file, int line)
|
|
{
|
|
ASSERT_vm_locking();
|
|
VM_ASSERT(GET_RACTOR() == cr);
|
|
vm_ractor_blocking_cnt_inc(vm, cr, file, line);
|
|
}
|
|
|
|
void
|
|
rb_vm_ractor_blocking_cnt_dec(rb_vm_t *vm, rb_ractor_t *cr, const char *file, int line)
|
|
{
|
|
ASSERT_vm_locking();
|
|
VM_ASSERT(GET_RACTOR() == cr);
|
|
|
|
RUBY_DEBUG_LOG2(file, line, "vm->ractor.blocking_cnt:%d--", vm->ractor.blocking_cnt);
|
|
VM_ASSERT(vm->ractor.blocking_cnt > 0);
|
|
vm->ractor.blocking_cnt--;
|
|
|
|
ractor_status_set(cr, ractor_running);
|
|
}
|
|
|
|
static void
|
|
ractor_check_blocking(rb_ractor_t *cr, unsigned int remained_thread_cnt, const char *file, int line)
|
|
{
|
|
VM_ASSERT(cr == GET_RACTOR());
|
|
|
|
RUBY_DEBUG_LOG2(file, line,
|
|
"cr->threads.cnt:%u cr->threads.blocking_cnt:%u vm->ractor.cnt:%u vm->ractor.blocking_cnt:%u",
|
|
cr->threads.cnt, cr->threads.blocking_cnt,
|
|
GET_VM()->ractor.cnt, GET_VM()->ractor.blocking_cnt);
|
|
|
|
VM_ASSERT(cr->threads.cnt >= cr->threads.blocking_cnt + 1);
|
|
|
|
if (remained_thread_cnt > 0 &&
|
|
// will be block
|
|
cr->threads.cnt == cr->threads.blocking_cnt + 1) {
|
|
// change ractor status: running -> blocking
|
|
rb_vm_t *vm = GET_VM();
|
|
ASSERT_vm_unlocking();
|
|
|
|
RB_VM_LOCK();
|
|
{
|
|
rb_vm_ractor_blocking_cnt_inc(vm, cr, file, line);
|
|
}
|
|
RB_VM_UNLOCK();
|
|
}
|
|
}
|
|
|
|
void rb_threadptr_remove(rb_thread_t *th);
|
|
|
|
void
|
|
rb_ractor_living_threads_remove(rb_ractor_t *cr, rb_thread_t *th)
|
|
{
|
|
VM_ASSERT(cr == GET_RACTOR());
|
|
RUBY_DEBUG_LOG("r->threads.cnt:%d--", cr->threads.cnt);
|
|
ractor_check_blocking(cr, cr->threads.cnt - 1, __FILE__, __LINE__);
|
|
|
|
rb_threadptr_remove(th);
|
|
|
|
if (cr->threads.cnt == 1) {
|
|
vm_remove_ractor(th->vm, cr);
|
|
}
|
|
else {
|
|
RACTOR_LOCK(cr);
|
|
{
|
|
ccan_list_del(&th->lt_node);
|
|
cr->threads.cnt--;
|
|
}
|
|
RACTOR_UNLOCK(cr);
|
|
}
|
|
}
|
|
|
|
void
|
|
rb_ractor_blocking_threads_inc(rb_ractor_t *cr, const char *file, int line)
|
|
{
|
|
RUBY_DEBUG_LOG2(file, line, "cr->threads.blocking_cnt:%d++", cr->threads.blocking_cnt);
|
|
|
|
VM_ASSERT(cr->threads.cnt > 0);
|
|
VM_ASSERT(cr == GET_RACTOR());
|
|
|
|
ractor_check_blocking(cr, cr->threads.cnt, __FILE__, __LINE__);
|
|
cr->threads.blocking_cnt++;
|
|
}
|
|
|
|
void
|
|
rb_ractor_blocking_threads_dec(rb_ractor_t *cr, const char *file, int line)
|
|
{
|
|
RUBY_DEBUG_LOG2(file, line,
|
|
"r->threads.blocking_cnt:%d--, r->threads.cnt:%u",
|
|
cr->threads.blocking_cnt, cr->threads.cnt);
|
|
|
|
VM_ASSERT(cr == GET_RACTOR());
|
|
|
|
if (cr->threads.cnt == cr->threads.blocking_cnt) {
|
|
rb_vm_t *vm = GET_VM();
|
|
|
|
RB_VM_LOCK_ENTER();
|
|
{
|
|
rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__);
|
|
}
|
|
RB_VM_LOCK_LEAVE();
|
|
}
|
|
|
|
cr->threads.blocking_cnt--;
|
|
}
|
|
|
|
void
|
|
rb_ractor_vm_barrier_interrupt_running_thread(rb_ractor_t *r)
|
|
{
|
|
VM_ASSERT(r != GET_RACTOR());
|
|
ASSERT_ractor_unlocking(r);
|
|
ASSERT_vm_locking();
|
|
|
|
RACTOR_LOCK(r);
|
|
{
|
|
if (ractor_status_p(r, ractor_running)) {
|
|
rb_execution_context_t *ec = r->threads.running_ec;
|
|
if (ec) {
|
|
RUBY_VM_SET_VM_BARRIER_INTERRUPT(ec);
|
|
}
|
|
}
|
|
}
|
|
RACTOR_UNLOCK(r);
|
|
}
|
|
|
|
void
|
|
rb_ractor_terminate_interrupt_main_thread(rb_ractor_t *r)
|
|
{
|
|
VM_ASSERT(r != GET_RACTOR());
|
|
ASSERT_ractor_unlocking(r);
|
|
ASSERT_vm_locking();
|
|
|
|
rb_thread_t *main_th = r->threads.main;
|
|
if (main_th) {
|
|
if (main_th->status != THREAD_KILLED) {
|
|
RUBY_VM_SET_TERMINATE_INTERRUPT(main_th->ec);
|
|
rb_threadptr_interrupt(main_th);
|
|
}
|
|
else {
|
|
RUBY_DEBUG_LOG("killed (%p)", (void *)main_th);
|
|
}
|
|
}
|
|
}
|
|
|
|
void rb_thread_terminate_all(rb_thread_t *th); // thread.c
|
|
|
|
static void
|
|
ractor_terminal_interrupt_all(rb_vm_t *vm)
|
|
{
|
|
if (vm->ractor.cnt > 1) {
|
|
// send terminate notification to all ractors
|
|
rb_ractor_t *r = 0;
|
|
ccan_list_for_each(&vm->ractor.set, r, vmlr_node) {
|
|
if (r != vm->ractor.main_ractor) {
|
|
RUBY_DEBUG_LOG("r:%d", rb_ractor_id(r));
|
|
rb_ractor_terminate_interrupt_main_thread(r);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void rb_add_running_thread(rb_thread_t *th);
|
|
void rb_del_running_thread(rb_thread_t *th);
|
|
|
|
void
|
|
rb_ractor_terminate_all(void)
|
|
{
|
|
rb_vm_t *vm = GET_VM();
|
|
rb_ractor_t *cr = vm->ractor.main_ractor;
|
|
|
|
RUBY_DEBUG_LOG("ractor.cnt:%d", (int)vm->ractor.cnt);
|
|
|
|
VM_ASSERT(cr == GET_RACTOR()); // only main-ractor's main-thread should kick it.
|
|
|
|
if (vm->ractor.cnt > 1) {
|
|
RB_VM_LOCK();
|
|
{
|
|
ractor_terminal_interrupt_all(vm); // kill all ractors
|
|
}
|
|
RB_VM_UNLOCK();
|
|
}
|
|
rb_thread_terminate_all(GET_THREAD()); // kill other threads in main-ractor and wait
|
|
|
|
RB_VM_LOCK();
|
|
{
|
|
while (vm->ractor.cnt > 1) {
|
|
RUBY_DEBUG_LOG("terminate_waiting:%d", vm->ractor.sync.terminate_waiting);
|
|
vm->ractor.sync.terminate_waiting = true;
|
|
|
|
// wait for 1sec
|
|
rb_vm_ractor_blocking_cnt_inc(vm, cr, __FILE__, __LINE__);
|
|
rb_del_running_thread(rb_ec_thread_ptr(cr->threads.running_ec));
|
|
rb_vm_cond_timedwait(vm, &vm->ractor.sync.terminate_cond, 1000 /* ms */);
|
|
rb_add_running_thread(rb_ec_thread_ptr(cr->threads.running_ec));
|
|
rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__);
|
|
|
|
ractor_terminal_interrupt_all(vm);
|
|
}
|
|
}
|
|
RB_VM_UNLOCK();
|
|
}
|
|
|
|
rb_execution_context_t *
|
|
rb_vm_main_ractor_ec(rb_vm_t *vm)
|
|
{
|
|
return vm->ractor.main_ractor->threads.running_ec;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_moved_missing(int argc, VALUE *argv, VALUE self)
|
|
{
|
|
rb_raise(rb_eRactorMovedError, "can not send any methods to a moved object");
|
|
}
|
|
|
|
#ifndef USE_RACTOR_SELECTOR
|
|
#define USE_RACTOR_SELECTOR 0
|
|
#endif
|
|
|
|
RUBY_SYMBOL_EXPORT_BEGIN
|
|
void rb_init_ractor_selector(void);
|
|
RUBY_SYMBOL_EXPORT_END
|
|
|
|
void
|
|
rb_init_ractor_selector(void)
|
|
{
|
|
rb_cRactorSelector = rb_define_class_under(rb_cRactor, "Selector", rb_cObject);
|
|
rb_undef_alloc_func(rb_cRactorSelector);
|
|
|
|
rb_define_singleton_method(rb_cRactorSelector, "new", ractor_selector_new , -1);
|
|
rb_define_method(rb_cRactorSelector, "add", ractor_selector_add, 1);
|
|
rb_define_method(rb_cRactorSelector, "remove", ractor_selector_remove, 1);
|
|
rb_define_method(rb_cRactorSelector, "clear", ractor_selector_clear, 0);
|
|
rb_define_method(rb_cRactorSelector, "empty?", ractor_selector_empty_p, 0);
|
|
rb_define_method(rb_cRactorSelector, "wait", ractor_selector_wait, -1);
|
|
rb_define_method(rb_cRactorSelector, "_wait", ractor_selector__wait, 4);
|
|
}
|
|
|
|
/*
|
|
* Document-class: Ractor::ClosedError
|
|
*
|
|
* Raised when an attempt is made to send a message to a closed port,
|
|
* or to retrieve a message from a closed and empty port.
|
|
* Ports may be closed explicitly with Ractor#close_outgoing/close_incoming
|
|
* and are closed implicitly when a Ractor terminates.
|
|
*
|
|
* r = Ractor.new { sleep(500) }
|
|
* r.close_outgoing
|
|
* r.take # Ractor::ClosedError
|
|
*
|
|
* ClosedError is a descendant of StopIteration, so the closing of the ractor will break
|
|
* the loops without propagating the error:
|
|
*
|
|
* r = Ractor.new do
|
|
* loop do
|
|
* msg = receive # raises ClosedError and loop traps it
|
|
* puts "Received: #{msg}"
|
|
* end
|
|
* puts "loop exited"
|
|
* end
|
|
*
|
|
* 3.times{|i| r << i}
|
|
* r.close_incoming
|
|
* r.take
|
|
* puts "Continue successfully"
|
|
*
|
|
* This will print:
|
|
*
|
|
* Received: 0
|
|
* Received: 1
|
|
* Received: 2
|
|
* loop exited
|
|
* Continue successfully
|
|
*/
|
|
|
|
/*
|
|
* Document-class: Ractor::RemoteError
|
|
*
|
|
* Raised on attempt to Ractor#take if there was an uncaught exception in the Ractor.
|
|
* Its +cause+ will contain the original exception, and +ractor+ is the original ractor
|
|
* it was raised in.
|
|
*
|
|
* r = Ractor.new { raise "Something weird happened" }
|
|
*
|
|
* begin
|
|
* r.take
|
|
* rescue => e
|
|
* p e # => #<Ractor::RemoteError: thrown by remote Ractor.>
|
|
* p e.ractor == r # => true
|
|
* p e.cause # => #<RuntimeError: Something weird happened>
|
|
* end
|
|
*
|
|
*/
|
|
|
|
/*
|
|
* Document-class: Ractor::MovedError
|
|
*
|
|
* Raised on an attempt to access an object which was moved in Ractor#send or Ractor.yield.
|
|
*
|
|
* r = Ractor.new { sleep }
|
|
*
|
|
* ary = [1, 2, 3]
|
|
* r.send(ary, move: true)
|
|
* ary.inspect
|
|
* # Ractor::MovedError (can not send any methods to a moved object)
|
|
*
|
|
*/
|
|
|
|
/*
|
|
* Document-class: Ractor::MovedObject
|
|
*
|
|
* A special object which replaces any value that was moved to another ractor in Ractor#send
|
|
* or Ractor.yield. Any attempt to access the object results in Ractor::MovedError.
|
|
*
|
|
* r = Ractor.new { receive }
|
|
*
|
|
* ary = [1, 2, 3]
|
|
* r.send(ary, move: true)
|
|
* p Ractor::MovedObject === ary
|
|
* # => true
|
|
* ary.inspect
|
|
* # Ractor::MovedError (can not send any methods to a moved object)
|
|
*/
|
|
|
|
// Main docs are in ractor.rb, but without this clause there are weird artifacts
|
|
// in their rendering.
|
|
/*
|
|
* Document-class: Ractor
|
|
*
|
|
*/
|
|
|
|
void
|
|
Init_Ractor(void)
|
|
{
|
|
rb_cRactor = rb_define_class("Ractor", rb_cObject);
|
|
rb_undef_alloc_func(rb_cRactor);
|
|
|
|
rb_eRactorError = rb_define_class_under(rb_cRactor, "Error", rb_eRuntimeError);
|
|
rb_eRactorIsolationError = rb_define_class_under(rb_cRactor, "IsolationError", rb_eRactorError);
|
|
rb_eRactorRemoteError = rb_define_class_under(rb_cRactor, "RemoteError", rb_eRactorError);
|
|
rb_eRactorMovedError = rb_define_class_under(rb_cRactor, "MovedError", rb_eRactorError);
|
|
rb_eRactorClosedError = rb_define_class_under(rb_cRactor, "ClosedError", rb_eStopIteration);
|
|
rb_eRactorUnsafeError = rb_define_class_under(rb_cRactor, "UnsafeError", rb_eRactorError);
|
|
|
|
rb_cRactorMovedObject = rb_define_class_under(rb_cRactor, "MovedObject", rb_cBasicObject);
|
|
rb_undef_alloc_func(rb_cRactorMovedObject);
|
|
rb_define_method(rb_cRactorMovedObject, "method_missing", ractor_moved_missing, -1);
|
|
|
|
// override methods defined in BasicObject
|
|
rb_define_method(rb_cRactorMovedObject, "__send__", ractor_moved_missing, -1);
|
|
rb_define_method(rb_cRactorMovedObject, "!", ractor_moved_missing, -1);
|
|
rb_define_method(rb_cRactorMovedObject, "==", ractor_moved_missing, -1);
|
|
rb_define_method(rb_cRactorMovedObject, "!=", ractor_moved_missing, -1);
|
|
rb_define_method(rb_cRactorMovedObject, "__id__", ractor_moved_missing, -1);
|
|
rb_define_method(rb_cRactorMovedObject, "equal?", ractor_moved_missing, -1);
|
|
rb_define_method(rb_cRactorMovedObject, "instance_eval", ractor_moved_missing, -1);
|
|
rb_define_method(rb_cRactorMovedObject, "instance_exec", ractor_moved_missing, -1);
|
|
|
|
#if USE_RACTOR_SELECTOR
|
|
rb_init_ractor_selector();
|
|
#endif
|
|
}
|
|
|
|
void
|
|
rb_ractor_dump(void)
|
|
{
|
|
rb_vm_t *vm = GET_VM();
|
|
rb_ractor_t *r = 0;
|
|
|
|
ccan_list_for_each(&vm->ractor.set, r, vmlr_node) {
|
|
if (r != vm->ractor.main_ractor) {
|
|
fprintf(stderr, "r:%u (%s)\n", r->pub.id, ractor_status_str(r->status_));
|
|
}
|
|
}
|
|
}
|
|
|
|
VALUE
|
|
rb_ractor_stdin(void)
|
|
{
|
|
if (rb_ractor_main_p()) {
|
|
return rb_stdin;
|
|
}
|
|
else {
|
|
rb_ractor_t *cr = GET_RACTOR();
|
|
return cr->r_stdin;
|
|
}
|
|
}
|
|
|
|
VALUE
|
|
rb_ractor_stdout(void)
|
|
{
|
|
if (rb_ractor_main_p()) {
|
|
return rb_stdout;
|
|
}
|
|
else {
|
|
rb_ractor_t *cr = GET_RACTOR();
|
|
return cr->r_stdout;
|
|
}
|
|
}
|
|
|
|
VALUE
|
|
rb_ractor_stderr(void)
|
|
{
|
|
if (rb_ractor_main_p()) {
|
|
return rb_stderr;
|
|
}
|
|
else {
|
|
rb_ractor_t *cr = GET_RACTOR();
|
|
return cr->r_stderr;
|
|
}
|
|
}
|
|
|
|
void
|
|
rb_ractor_stdin_set(VALUE in)
|
|
{
|
|
if (rb_ractor_main_p()) {
|
|
rb_stdin = in;
|
|
}
|
|
else {
|
|
rb_ractor_t *cr = GET_RACTOR();
|
|
RB_OBJ_WRITE(cr->pub.self, &cr->r_stdin, in);
|
|
}
|
|
}
|
|
|
|
void
|
|
rb_ractor_stdout_set(VALUE out)
|
|
{
|
|
if (rb_ractor_main_p()) {
|
|
rb_stdout = out;
|
|
}
|
|
else {
|
|
rb_ractor_t *cr = GET_RACTOR();
|
|
RB_OBJ_WRITE(cr->pub.self, &cr->r_stdout, out);
|
|
}
|
|
}
|
|
|
|
void
|
|
rb_ractor_stderr_set(VALUE err)
|
|
{
|
|
if (rb_ractor_main_p()) {
|
|
rb_stderr = err;
|
|
}
|
|
else {
|
|
rb_ractor_t *cr = GET_RACTOR();
|
|
RB_OBJ_WRITE(cr->pub.self, &cr->r_stderr, err);
|
|
}
|
|
}
|
|
|
|
rb_hook_list_t *
|
|
rb_ractor_hooks(rb_ractor_t *cr)
|
|
{
|
|
return &cr->pub.hooks;
|
|
}
|
|
|
|
/// traverse function
|
|
|
|
// 2: stop search
|
|
// 1: skip child
|
|
// 0: continue
|
|
|
|
enum obj_traverse_iterator_result {
|
|
traverse_cont,
|
|
traverse_skip,
|
|
traverse_stop,
|
|
};
|
|
|
|
typedef enum obj_traverse_iterator_result (*rb_obj_traverse_enter_func)(VALUE obj);
|
|
typedef enum obj_traverse_iterator_result (*rb_obj_traverse_leave_func)(VALUE obj);
|
|
typedef enum obj_traverse_iterator_result (*rb_obj_traverse_final_func)(VALUE obj);
|
|
|
|
static enum obj_traverse_iterator_result null_leave(VALUE obj);
|
|
|
|
struct obj_traverse_data {
|
|
rb_obj_traverse_enter_func enter_func;
|
|
rb_obj_traverse_leave_func leave_func;
|
|
|
|
st_table *rec;
|
|
VALUE rec_hash;
|
|
};
|
|
|
|
|
|
struct obj_traverse_callback_data {
|
|
bool stop;
|
|
struct obj_traverse_data *data;
|
|
};
|
|
|
|
static int obj_traverse_i(VALUE obj, struct obj_traverse_data *data);
|
|
|
|
static int
|
|
obj_hash_traverse_i(VALUE key, VALUE val, VALUE ptr)
|
|
{
|
|
struct obj_traverse_callback_data *d = (struct obj_traverse_callback_data *)ptr;
|
|
|
|
if (obj_traverse_i(key, d->data)) {
|
|
d->stop = true;
|
|
return ST_STOP;
|
|
}
|
|
|
|
if (obj_traverse_i(val, d->data)) {
|
|
d->stop = true;
|
|
return ST_STOP;
|
|
}
|
|
|
|
return ST_CONTINUE;
|
|
}
|
|
|
|
static void
|
|
obj_traverse_reachable_i(VALUE obj, void *ptr)
|
|
{
|
|
struct obj_traverse_callback_data *d = (struct obj_traverse_callback_data *)ptr;
|
|
|
|
if (obj_traverse_i(obj, d->data)) {
|
|
d->stop = true;
|
|
}
|
|
}
|
|
|
|
static struct st_table *
|
|
obj_traverse_rec(struct obj_traverse_data *data)
|
|
{
|
|
if (UNLIKELY(!data->rec)) {
|
|
data->rec_hash = rb_ident_hash_new();
|
|
data->rec = RHASH_ST_TABLE(data->rec_hash);
|
|
}
|
|
return data->rec;
|
|
}
|
|
|
|
static int
|
|
obj_traverse_ivar_foreach_i(ID key, VALUE val, st_data_t ptr)
|
|
{
|
|
struct obj_traverse_callback_data *d = (struct obj_traverse_callback_data *)ptr;
|
|
|
|
if (obj_traverse_i(val, d->data)) {
|
|
d->stop = true;
|
|
return ST_STOP;
|
|
}
|
|
|
|
return ST_CONTINUE;
|
|
}
|
|
|
|
static int
|
|
obj_traverse_i(VALUE obj, struct obj_traverse_data *data)
|
|
{
|
|
if (RB_SPECIAL_CONST_P(obj)) return 0;
|
|
|
|
switch (data->enter_func(obj)) {
|
|
case traverse_cont: break;
|
|
case traverse_skip: return 0; // skip children
|
|
case traverse_stop: return 1; // stop search
|
|
}
|
|
|
|
if (UNLIKELY(st_insert(obj_traverse_rec(data), obj, 1))) {
|
|
// already traversed
|
|
return 0;
|
|
}
|
|
|
|
struct obj_traverse_callback_data d = {
|
|
.stop = false,
|
|
.data = data,
|
|
};
|
|
rb_ivar_foreach(obj, obj_traverse_ivar_foreach_i, (st_data_t)&d);
|
|
if (d.stop) return 1;
|
|
|
|
switch (BUILTIN_TYPE(obj)) {
|
|
// no child node
|
|
case T_STRING:
|
|
case T_FLOAT:
|
|
case T_BIGNUM:
|
|
case T_REGEXP:
|
|
case T_FILE:
|
|
case T_SYMBOL:
|
|
case T_MATCH:
|
|
break;
|
|
|
|
case T_OBJECT:
|
|
/* Instance variables already traversed. */
|
|
break;
|
|
|
|
case T_ARRAY:
|
|
{
|
|
for (int i = 0; i < RARRAY_LENINT(obj); i++) {
|
|
VALUE e = rb_ary_entry(obj, i);
|
|
if (obj_traverse_i(e, data)) return 1;
|
|
}
|
|
}
|
|
break;
|
|
|
|
case T_HASH:
|
|
{
|
|
if (obj_traverse_i(RHASH_IFNONE(obj), data)) return 1;
|
|
|
|
struct obj_traverse_callback_data d = {
|
|
.stop = false,
|
|
.data = data,
|
|
};
|
|
rb_hash_foreach(obj, obj_hash_traverse_i, (VALUE)&d);
|
|
if (d.stop) return 1;
|
|
}
|
|
break;
|
|
|
|
case T_STRUCT:
|
|
{
|
|
long len = RSTRUCT_LEN(obj);
|
|
const VALUE *ptr = RSTRUCT_CONST_PTR(obj);
|
|
|
|
for (long i=0; i<len; i++) {
|
|
if (obj_traverse_i(ptr[i], data)) return 1;
|
|
}
|
|
}
|
|
break;
|
|
|
|
case T_RATIONAL:
|
|
if (obj_traverse_i(RRATIONAL(obj)->num, data)) return 1;
|
|
if (obj_traverse_i(RRATIONAL(obj)->den, data)) return 1;
|
|
break;
|
|
case T_COMPLEX:
|
|
if (obj_traverse_i(RCOMPLEX(obj)->real, data)) return 1;
|
|
if (obj_traverse_i(RCOMPLEX(obj)->imag, data)) return 1;
|
|
break;
|
|
|
|
case T_DATA:
|
|
case T_IMEMO:
|
|
{
|
|
struct obj_traverse_callback_data d = {
|
|
.stop = false,
|
|
.data = data,
|
|
};
|
|
RB_VM_LOCK_ENTER_NO_BARRIER();
|
|
{
|
|
rb_objspace_reachable_objects_from(obj, obj_traverse_reachable_i, &d);
|
|
}
|
|
RB_VM_LOCK_LEAVE_NO_BARRIER();
|
|
if (d.stop) return 1;
|
|
}
|
|
break;
|
|
|
|
// unreachable
|
|
case T_CLASS:
|
|
case T_MODULE:
|
|
case T_ICLASS:
|
|
default:
|
|
rp(obj);
|
|
rb_bug("unreachable");
|
|
}
|
|
|
|
if (data->leave_func(obj) == traverse_stop) {
|
|
return 1;
|
|
}
|
|
else {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
struct rb_obj_traverse_final_data {
|
|
rb_obj_traverse_final_func final_func;
|
|
int stopped;
|
|
};
|
|
|
|
static int
|
|
obj_traverse_final_i(st_data_t key, st_data_t val, st_data_t arg)
|
|
{
|
|
struct rb_obj_traverse_final_data *data = (void *)arg;
|
|
if (data->final_func(key)) {
|
|
data->stopped = 1;
|
|
return ST_STOP;
|
|
}
|
|
return ST_CONTINUE;
|
|
}
|
|
|
|
// 0: traverse all
|
|
// 1: stopped
|
|
static int
|
|
rb_obj_traverse(VALUE obj,
|
|
rb_obj_traverse_enter_func enter_func,
|
|
rb_obj_traverse_leave_func leave_func,
|
|
rb_obj_traverse_final_func final_func)
|
|
{
|
|
struct obj_traverse_data data = {
|
|
.enter_func = enter_func,
|
|
.leave_func = leave_func,
|
|
.rec = NULL,
|
|
};
|
|
|
|
if (obj_traverse_i(obj, &data)) return 1;
|
|
if (final_func && data.rec) {
|
|
struct rb_obj_traverse_final_data f = {final_func, 0};
|
|
st_foreach(data.rec, obj_traverse_final_i, (st_data_t)&f);
|
|
return f.stopped;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
frozen_shareable_p(VALUE obj, bool *made_shareable)
|
|
{
|
|
if (!RB_TYPE_P(obj, T_DATA)) {
|
|
return true;
|
|
}
|
|
else if (RTYPEDDATA_P(obj)) {
|
|
const rb_data_type_t *type = RTYPEDDATA_TYPE(obj);
|
|
if (type->flags & RUBY_TYPED_FROZEN_SHAREABLE) {
|
|
return true;
|
|
}
|
|
else if (made_shareable && rb_obj_is_proc(obj)) {
|
|
// special path to make shareable Proc.
|
|
rb_proc_ractor_make_shareable(obj);
|
|
*made_shareable = true;
|
|
VM_ASSERT(RB_OBJ_SHAREABLE_P(obj));
|
|
return false;
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
static enum obj_traverse_iterator_result
|
|
make_shareable_check_shareable(VALUE obj)
|
|
{
|
|
VM_ASSERT(!SPECIAL_CONST_P(obj));
|
|
bool made_shareable = false;
|
|
|
|
if (rb_ractor_shareable_p(obj)) {
|
|
return traverse_skip;
|
|
}
|
|
else if (!frozen_shareable_p(obj, &made_shareable)) {
|
|
if (made_shareable) {
|
|
return traverse_skip;
|
|
}
|
|
else {
|
|
rb_raise(rb_eRactorError, "can not make shareable object for %"PRIsVALUE, obj);
|
|
}
|
|
}
|
|
|
|
if (!RB_OBJ_FROZEN_RAW(obj)) {
|
|
rb_funcall(obj, idFreeze, 0);
|
|
|
|
if (UNLIKELY(!RB_OBJ_FROZEN_RAW(obj))) {
|
|
rb_raise(rb_eRactorError, "#freeze does not freeze object correctly");
|
|
}
|
|
|
|
if (RB_OBJ_SHAREABLE_P(obj)) {
|
|
return traverse_skip;
|
|
}
|
|
}
|
|
|
|
return traverse_cont;
|
|
}
|
|
|
|
static enum obj_traverse_iterator_result
|
|
mark_shareable(VALUE obj)
|
|
{
|
|
FL_SET_RAW(obj, RUBY_FL_SHAREABLE);
|
|
return traverse_cont;
|
|
}
|
|
|
|
VALUE
|
|
rb_ractor_make_shareable(VALUE obj)
|
|
{
|
|
rb_obj_traverse(obj,
|
|
make_shareable_check_shareable,
|
|
null_leave, mark_shareable);
|
|
return obj;
|
|
}
|
|
|
|
VALUE
|
|
rb_ractor_make_shareable_copy(VALUE obj)
|
|
{
|
|
VALUE copy = ractor_copy(obj);
|
|
return rb_ractor_make_shareable(copy);
|
|
}
|
|
|
|
VALUE
|
|
rb_ractor_ensure_shareable(VALUE obj, VALUE name)
|
|
{
|
|
if (!rb_ractor_shareable_p(obj)) {
|
|
VALUE message = rb_sprintf("cannot assign unshareable object to %"PRIsVALUE,
|
|
name);
|
|
rb_exc_raise(rb_exc_new_str(rb_eRactorIsolationError, message));
|
|
}
|
|
return obj;
|
|
}
|
|
|
|
void
|
|
rb_ractor_ensure_main_ractor(const char *msg)
|
|
{
|
|
if (!rb_ractor_main_p()) {
|
|
rb_raise(rb_eRactorIsolationError, "%s", msg);
|
|
}
|
|
}
|
|
|
|
static enum obj_traverse_iterator_result
|
|
shareable_p_enter(VALUE obj)
|
|
{
|
|
if (RB_OBJ_SHAREABLE_P(obj)) {
|
|
return traverse_skip;
|
|
}
|
|
else if (RB_TYPE_P(obj, T_CLASS) ||
|
|
RB_TYPE_P(obj, T_MODULE) ||
|
|
RB_TYPE_P(obj, T_ICLASS)) {
|
|
// TODO: remove it
|
|
mark_shareable(obj);
|
|
return traverse_skip;
|
|
}
|
|
else if (RB_OBJ_FROZEN_RAW(obj) &&
|
|
frozen_shareable_p(obj, NULL)) {
|
|
return traverse_cont;
|
|
}
|
|
|
|
return traverse_stop; // fail
|
|
}
|
|
|
|
bool
|
|
rb_ractor_shareable_p_continue(VALUE obj)
|
|
{
|
|
if (rb_obj_traverse(obj,
|
|
shareable_p_enter, null_leave,
|
|
mark_shareable)) {
|
|
return false;
|
|
}
|
|
else {
|
|
return true;
|
|
}
|
|
}
|
|
|
|
#if RACTOR_CHECK_MODE > 0
|
|
static enum obj_traverse_iterator_result
|
|
reset_belonging_enter(VALUE obj)
|
|
{
|
|
if (rb_ractor_shareable_p(obj)) {
|
|
return traverse_skip;
|
|
}
|
|
else {
|
|
rb_ractor_setup_belonging(obj);
|
|
return traverse_cont;
|
|
}
|
|
}
|
|
#endif
|
|
|
|
static enum obj_traverse_iterator_result
|
|
null_leave(VALUE obj)
|
|
{
|
|
return traverse_cont;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_reset_belonging(VALUE obj)
|
|
{
|
|
#if RACTOR_CHECK_MODE > 0
|
|
rb_obj_traverse(obj, reset_belonging_enter, null_leave, NULL);
|
|
#endif
|
|
return obj;
|
|
}
|
|
|
|
|
|
/// traverse and replace function
|
|
|
|
// 2: stop search
|
|
// 1: skip child
|
|
// 0: continue
|
|
|
|
struct obj_traverse_replace_data;
|
|
static int obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data);
|
|
typedef enum obj_traverse_iterator_result (*rb_obj_traverse_replace_enter_func)(VALUE obj, struct obj_traverse_replace_data *data);
|
|
typedef enum obj_traverse_iterator_result (*rb_obj_traverse_replace_leave_func)(VALUE obj, struct obj_traverse_replace_data *data);
|
|
|
|
struct obj_traverse_replace_data {
|
|
rb_obj_traverse_replace_enter_func enter_func;
|
|
rb_obj_traverse_replace_leave_func leave_func;
|
|
|
|
st_table *rec;
|
|
VALUE rec_hash;
|
|
|
|
VALUE replacement;
|
|
bool move;
|
|
};
|
|
|
|
struct obj_traverse_replace_callback_data {
|
|
bool stop;
|
|
VALUE src;
|
|
struct obj_traverse_replace_data *data;
|
|
};
|
|
|
|
static int
|
|
obj_hash_traverse_replace_foreach_i(st_data_t key, st_data_t value, st_data_t argp, int error)
|
|
{
|
|
return ST_REPLACE;
|
|
}
|
|
|
|
static int
|
|
obj_hash_traverse_replace_i(st_data_t *key, st_data_t *val, st_data_t ptr, int exists)
|
|
{
|
|
struct obj_traverse_replace_callback_data *d = (struct obj_traverse_replace_callback_data *)ptr;
|
|
struct obj_traverse_replace_data *data = d->data;
|
|
|
|
if (obj_traverse_replace_i(*key, data)) {
|
|
d->stop = true;
|
|
return ST_STOP;
|
|
}
|
|
else if (*key != data->replacement) {
|
|
VALUE v = *key = data->replacement;
|
|
RB_OBJ_WRITTEN(d->src, Qundef, v);
|
|
}
|
|
|
|
if (obj_traverse_replace_i(*val, data)) {
|
|
d->stop = true;
|
|
return ST_STOP;
|
|
}
|
|
else if (*val != data->replacement) {
|
|
VALUE v = *val = data->replacement;
|
|
RB_OBJ_WRITTEN(d->src, Qundef, v);
|
|
}
|
|
|
|
return ST_CONTINUE;
|
|
}
|
|
|
|
static int
|
|
obj_iv_hash_traverse_replace_foreach_i(st_data_t _key, st_data_t _val, st_data_t _data, int _x)
|
|
{
|
|
return ST_REPLACE;
|
|
}
|
|
|
|
static int
|
|
obj_iv_hash_traverse_replace_i(st_data_t * _key, st_data_t * val, st_data_t ptr, int exists)
|
|
{
|
|
struct obj_traverse_replace_callback_data *d = (struct obj_traverse_replace_callback_data *)ptr;
|
|
struct obj_traverse_replace_data *data = d->data;
|
|
|
|
if (obj_traverse_replace_i(*(VALUE *)val, data)) {
|
|
d->stop = true;
|
|
return ST_STOP;
|
|
}
|
|
else if (*(VALUE *)val != data->replacement) {
|
|
VALUE v = *(VALUE *)val = data->replacement;
|
|
RB_OBJ_WRITTEN(d->src, Qundef, v);
|
|
}
|
|
|
|
return ST_CONTINUE;
|
|
}
|
|
|
|
static struct st_table *
|
|
obj_traverse_replace_rec(struct obj_traverse_replace_data *data)
|
|
{
|
|
if (UNLIKELY(!data->rec)) {
|
|
data->rec_hash = rb_ident_hash_new();
|
|
data->rec = RHASH_ST_TABLE(data->rec_hash);
|
|
}
|
|
return data->rec;
|
|
}
|
|
|
|
static void
|
|
obj_refer_only_shareables_p_i(VALUE obj, void *ptr)
|
|
{
|
|
int *pcnt = (int *)ptr;
|
|
|
|
if (!rb_ractor_shareable_p(obj)) {
|
|
++*pcnt;
|
|
}
|
|
}
|
|
|
|
static int
|
|
obj_refer_only_shareables_p(VALUE obj)
|
|
{
|
|
int cnt = 0;
|
|
RB_VM_LOCK_ENTER_NO_BARRIER();
|
|
{
|
|
rb_objspace_reachable_objects_from(obj, obj_refer_only_shareables_p_i, &cnt);
|
|
}
|
|
RB_VM_LOCK_LEAVE_NO_BARRIER();
|
|
return cnt == 0;
|
|
}
|
|
|
|
static int
|
|
obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data)
|
|
{
|
|
st_data_t replacement;
|
|
|
|
if (RB_SPECIAL_CONST_P(obj)) {
|
|
data->replacement = obj;
|
|
return 0;
|
|
}
|
|
|
|
switch (data->enter_func(obj, data)) {
|
|
case traverse_cont: break;
|
|
case traverse_skip: return 0; // skip children
|
|
case traverse_stop: return 1; // stop search
|
|
}
|
|
|
|
replacement = (st_data_t)data->replacement;
|
|
|
|
if (UNLIKELY(st_lookup(obj_traverse_replace_rec(data), (st_data_t)obj, &replacement))) {
|
|
data->replacement = (VALUE)replacement;
|
|
return 0;
|
|
}
|
|
else {
|
|
st_insert(obj_traverse_replace_rec(data), (st_data_t)obj, replacement);
|
|
}
|
|
|
|
if (!data->move) {
|
|
obj = replacement;
|
|
}
|
|
|
|
#define CHECK_AND_REPLACE(v) do { \
|
|
VALUE _val = (v); \
|
|
if (obj_traverse_replace_i(_val, data)) { return 1; } \
|
|
else if (data->replacement != _val) { RB_OBJ_WRITE(obj, &v, data->replacement); } \
|
|
} while (0)
|
|
|
|
if (UNLIKELY(FL_TEST_RAW(obj, FL_EXIVAR))) {
|
|
struct gen_ivtbl *ivtbl;
|
|
rb_ivar_generic_ivtbl_lookup(obj, &ivtbl);
|
|
|
|
if (UNLIKELY(rb_shape_obj_too_complex(obj))) {
|
|
struct obj_traverse_replace_callback_data d = {
|
|
.stop = false,
|
|
.data = data,
|
|
.src = obj,
|
|
};
|
|
rb_st_foreach_with_replace(
|
|
ivtbl->as.complex.table,
|
|
obj_iv_hash_traverse_replace_foreach_i,
|
|
obj_iv_hash_traverse_replace_i,
|
|
(st_data_t)&d
|
|
);
|
|
if (d.stop) return 1;
|
|
}
|
|
else {
|
|
for (uint32_t i = 0; i < ivtbl->as.shape.numiv; i++) {
|
|
if (!UNDEF_P(ivtbl->as.shape.ivptr[i])) {
|
|
CHECK_AND_REPLACE(ivtbl->as.shape.ivptr[i]);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
switch (BUILTIN_TYPE(obj)) {
|
|
// no child node
|
|
case T_FLOAT:
|
|
case T_BIGNUM:
|
|
case T_REGEXP:
|
|
case T_FILE:
|
|
case T_SYMBOL:
|
|
case T_MATCH:
|
|
break;
|
|
case T_STRING:
|
|
rb_str_make_independent(obj);
|
|
break;
|
|
|
|
case T_OBJECT:
|
|
{
|
|
if (rb_shape_obj_too_complex(obj)) {
|
|
struct obj_traverse_replace_callback_data d = {
|
|
.stop = false,
|
|
.data = data,
|
|
.src = obj,
|
|
};
|
|
rb_st_foreach_with_replace(
|
|
ROBJECT_IV_HASH(obj),
|
|
obj_iv_hash_traverse_replace_foreach_i,
|
|
obj_iv_hash_traverse_replace_i,
|
|
(st_data_t)&d
|
|
);
|
|
if (d.stop) return 1;
|
|
}
|
|
else {
|
|
uint32_t len = ROBJECT_IV_COUNT(obj);
|
|
VALUE *ptr = ROBJECT_IVPTR(obj);
|
|
|
|
for (uint32_t i = 0; i < len; i++) {
|
|
CHECK_AND_REPLACE(ptr[i]);
|
|
}
|
|
}
|
|
}
|
|
break;
|
|
|
|
case T_ARRAY:
|
|
{
|
|
rb_ary_cancel_sharing(obj);
|
|
|
|
for (int i = 0; i < RARRAY_LENINT(obj); i++) {
|
|
VALUE e = rb_ary_entry(obj, i);
|
|
|
|
if (obj_traverse_replace_i(e, data)) {
|
|
return 1;
|
|
}
|
|
else if (e != data->replacement) {
|
|
RARRAY_ASET(obj, i, data->replacement);
|
|
}
|
|
}
|
|
RB_GC_GUARD(obj);
|
|
}
|
|
break;
|
|
case T_HASH:
|
|
{
|
|
struct obj_traverse_replace_callback_data d = {
|
|
.stop = false,
|
|
.data = data,
|
|
.src = obj,
|
|
};
|
|
rb_hash_stlike_foreach_with_replace(obj,
|
|
obj_hash_traverse_replace_foreach_i,
|
|
obj_hash_traverse_replace_i,
|
|
(VALUE)&d);
|
|
if (d.stop) return 1;
|
|
// TODO: rehash here?
|
|
|
|
VALUE ifnone = RHASH_IFNONE(obj);
|
|
if (obj_traverse_replace_i(ifnone, data)) {
|
|
return 1;
|
|
}
|
|
else if (ifnone != data->replacement) {
|
|
RHASH_SET_IFNONE(obj, data->replacement);
|
|
}
|
|
}
|
|
break;
|
|
|
|
case T_STRUCT:
|
|
{
|
|
long len = RSTRUCT_LEN(obj);
|
|
const VALUE *ptr = RSTRUCT_CONST_PTR(obj);
|
|
|
|
for (long i=0; i<len; i++) {
|
|
CHECK_AND_REPLACE(ptr[i]);
|
|
}
|
|
}
|
|
break;
|
|
|
|
case T_RATIONAL:
|
|
CHECK_AND_REPLACE(RRATIONAL(obj)->num);
|
|
CHECK_AND_REPLACE(RRATIONAL(obj)->den);
|
|
break;
|
|
case T_COMPLEX:
|
|
CHECK_AND_REPLACE(RCOMPLEX(obj)->real);
|
|
CHECK_AND_REPLACE(RCOMPLEX(obj)->imag);
|
|
break;
|
|
|
|
case T_DATA:
|
|
if (!data->move && obj_refer_only_shareables_p(obj)) {
|
|
break;
|
|
}
|
|
else {
|
|
rb_raise(rb_eRactorError, "can not %s %"PRIsVALUE" object.",
|
|
data->move ? "move" : "copy", rb_class_of(obj));
|
|
}
|
|
|
|
case T_IMEMO:
|
|
// not supported yet
|
|
return 1;
|
|
|
|
// unreachable
|
|
case T_CLASS:
|
|
case T_MODULE:
|
|
case T_ICLASS:
|
|
default:
|
|
rp(obj);
|
|
rb_bug("unreachable");
|
|
}
|
|
|
|
data->replacement = (VALUE)replacement;
|
|
|
|
if (data->leave_func(obj, data) == traverse_stop) {
|
|
return 1;
|
|
}
|
|
else {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
// 0: traverse all
|
|
// 1: stopped
|
|
static VALUE
|
|
rb_obj_traverse_replace(VALUE obj,
|
|
rb_obj_traverse_replace_enter_func enter_func,
|
|
rb_obj_traverse_replace_leave_func leave_func,
|
|
bool move)
|
|
{
|
|
struct obj_traverse_replace_data data = {
|
|
.enter_func = enter_func,
|
|
.leave_func = leave_func,
|
|
.rec = NULL,
|
|
.replacement = Qundef,
|
|
.move = move,
|
|
};
|
|
|
|
if (obj_traverse_replace_i(obj, &data)) {
|
|
return Qundef;
|
|
}
|
|
else {
|
|
return data.replacement;
|
|
}
|
|
}
|
|
|
|
struct RVALUE {
|
|
VALUE flags;
|
|
VALUE klass;
|
|
VALUE v1;
|
|
VALUE v2;
|
|
VALUE v3;
|
|
};
|
|
|
|
static const VALUE fl_users = FL_USER1 | FL_USER2 | FL_USER3 |
|
|
FL_USER4 | FL_USER5 | FL_USER6 | FL_USER7 |
|
|
FL_USER8 | FL_USER9 | FL_USER10 | FL_USER11 |
|
|
FL_USER12 | FL_USER13 | FL_USER14 | FL_USER15 |
|
|
FL_USER16 | FL_USER17 | FL_USER18 | FL_USER19;
|
|
|
|
static void
|
|
ractor_moved_bang(VALUE obj)
|
|
{
|
|
// invalidate src object
|
|
struct RVALUE *rv = (void *)obj;
|
|
|
|
rv->klass = rb_cRactorMovedObject;
|
|
rv->v1 = 0;
|
|
rv->v2 = 0;
|
|
rv->v3 = 0;
|
|
rv->flags = rv->flags & ~fl_users;
|
|
|
|
if (BUILTIN_TYPE(obj) == T_OBJECT) ROBJECT_SET_SHAPE_ID(obj, ROOT_SHAPE_ID);
|
|
|
|
// TODO: record moved location
|
|
}
|
|
|
|
static enum obj_traverse_iterator_result
|
|
move_enter(VALUE obj, struct obj_traverse_replace_data *data)
|
|
{
|
|
if (rb_ractor_shareable_p(obj)) {
|
|
data->replacement = obj;
|
|
return traverse_skip;
|
|
}
|
|
else {
|
|
data->replacement = rb_obj_alloc(RBASIC_CLASS(obj));
|
|
return traverse_cont;
|
|
}
|
|
}
|
|
|
|
void rb_replace_generic_ivar(VALUE clone, VALUE obj); // variable.c
|
|
|
|
static enum obj_traverse_iterator_result
|
|
move_leave(VALUE obj, struct obj_traverse_replace_data *data)
|
|
{
|
|
VALUE v = data->replacement;
|
|
struct RVALUE *dst = (struct RVALUE *)v;
|
|
struct RVALUE *src = (struct RVALUE *)obj;
|
|
|
|
dst->flags = (dst->flags & ~fl_users) | (src->flags & fl_users);
|
|
|
|
dst->v1 = src->v1;
|
|
dst->v2 = src->v2;
|
|
dst->v3 = src->v3;
|
|
|
|
if (UNLIKELY(FL_TEST_RAW(obj, FL_EXIVAR))) {
|
|
rb_replace_generic_ivar(v, obj);
|
|
}
|
|
|
|
// TODO: generic_ivar
|
|
|
|
ractor_moved_bang(obj);
|
|
return traverse_cont;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_move(VALUE obj)
|
|
{
|
|
VALUE val = rb_obj_traverse_replace(obj, move_enter, move_leave, true);
|
|
if (!UNDEF_P(val)) {
|
|
return val;
|
|
}
|
|
else {
|
|
rb_raise(rb_eRactorError, "can not move the object");
|
|
}
|
|
}
|
|
|
|
static enum obj_traverse_iterator_result
|
|
copy_enter(VALUE obj, struct obj_traverse_replace_data *data)
|
|
{
|
|
if (rb_ractor_shareable_p(obj)) {
|
|
data->replacement = obj;
|
|
return traverse_skip;
|
|
}
|
|
else {
|
|
data->replacement = rb_obj_clone(obj);
|
|
return traverse_cont;
|
|
}
|
|
}
|
|
|
|
static enum obj_traverse_iterator_result
|
|
copy_leave(VALUE obj, struct obj_traverse_replace_data *data)
|
|
{
|
|
return traverse_cont;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_copy(VALUE obj)
|
|
{
|
|
VALUE val = rb_obj_traverse_replace(obj, copy_enter, copy_leave, false);
|
|
if (!UNDEF_P(val)) {
|
|
return val;
|
|
}
|
|
else {
|
|
rb_raise(rb_eRactorError, "can not copy the object");
|
|
}
|
|
}
|
|
|
|
// Ractor local storage
|
|
|
|
struct rb_ractor_local_key_struct {
|
|
const struct rb_ractor_local_storage_type *type;
|
|
void *main_cache;
|
|
};
|
|
|
|
static struct freed_ractor_local_keys_struct {
|
|
int cnt;
|
|
int capa;
|
|
rb_ractor_local_key_t *keys;
|
|
} freed_ractor_local_keys;
|
|
|
|
static int
|
|
ractor_local_storage_mark_i(st_data_t key, st_data_t val, st_data_t dmy)
|
|
{
|
|
struct rb_ractor_local_key_struct *k = (struct rb_ractor_local_key_struct *)key;
|
|
if (k->type->mark) (*k->type->mark)((void *)val);
|
|
return ST_CONTINUE;
|
|
}
|
|
|
|
static enum rb_id_table_iterator_result
|
|
idkey_local_storage_mark_i(ID id, VALUE val, void *dmy)
|
|
{
|
|
rb_gc_mark(val);
|
|
return ID_TABLE_CONTINUE;
|
|
}
|
|
|
|
static void
|
|
ractor_local_storage_mark(rb_ractor_t *r)
|
|
{
|
|
if (r->local_storage) {
|
|
st_foreach(r->local_storage, ractor_local_storage_mark_i, 0);
|
|
|
|
for (int i=0; i<freed_ractor_local_keys.cnt; i++) {
|
|
rb_ractor_local_key_t key = freed_ractor_local_keys.keys[i];
|
|
st_data_t val, k = (st_data_t)key;
|
|
if (st_delete(r->local_storage, &k, &val) &&
|
|
(key = (rb_ractor_local_key_t)k)->type->free) {
|
|
(*key->type->free)((void *)val);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (r->idkey_local_storage) {
|
|
rb_id_table_foreach(r->idkey_local_storage, idkey_local_storage_mark_i, NULL);
|
|
}
|
|
}
|
|
|
|
static int
|
|
ractor_local_storage_free_i(st_data_t key, st_data_t val, st_data_t dmy)
|
|
{
|
|
struct rb_ractor_local_key_struct *k = (struct rb_ractor_local_key_struct *)key;
|
|
if (k->type->free) (*k->type->free)((void *)val);
|
|
return ST_CONTINUE;
|
|
}
|
|
|
|
static void
|
|
ractor_local_storage_free(rb_ractor_t *r)
|
|
{
|
|
if (r->local_storage) {
|
|
st_foreach(r->local_storage, ractor_local_storage_free_i, 0);
|
|
st_free_table(r->local_storage);
|
|
}
|
|
|
|
if (r->idkey_local_storage) {
|
|
rb_id_table_free(r->idkey_local_storage);
|
|
}
|
|
}
|
|
|
|
static void
|
|
rb_ractor_local_storage_value_mark(void *ptr)
|
|
{
|
|
rb_gc_mark((VALUE)ptr);
|
|
}
|
|
|
|
static const struct rb_ractor_local_storage_type ractor_local_storage_type_null = {
|
|
NULL,
|
|
NULL,
|
|
};
|
|
|
|
const struct rb_ractor_local_storage_type rb_ractor_local_storage_type_free = {
|
|
NULL,
|
|
ruby_xfree,
|
|
};
|
|
|
|
static const struct rb_ractor_local_storage_type ractor_local_storage_type_value = {
|
|
rb_ractor_local_storage_value_mark,
|
|
NULL,
|
|
};
|
|
|
|
rb_ractor_local_key_t
|
|
rb_ractor_local_storage_ptr_newkey(const struct rb_ractor_local_storage_type *type)
|
|
{
|
|
rb_ractor_local_key_t key = ALLOC(struct rb_ractor_local_key_struct);
|
|
key->type = type ? type : &ractor_local_storage_type_null;
|
|
key->main_cache = (void *)Qundef;
|
|
return key;
|
|
}
|
|
|
|
rb_ractor_local_key_t
|
|
rb_ractor_local_storage_value_newkey(void)
|
|
{
|
|
return rb_ractor_local_storage_ptr_newkey(&ractor_local_storage_type_value);
|
|
}
|
|
|
|
void
|
|
rb_ractor_local_storage_delkey(rb_ractor_local_key_t key)
|
|
{
|
|
RB_VM_LOCK_ENTER();
|
|
{
|
|
if (freed_ractor_local_keys.cnt == freed_ractor_local_keys.capa) {
|
|
freed_ractor_local_keys.capa = freed_ractor_local_keys.capa ? freed_ractor_local_keys.capa * 2 : 4;
|
|
REALLOC_N(freed_ractor_local_keys.keys, rb_ractor_local_key_t, freed_ractor_local_keys.capa);
|
|
}
|
|
freed_ractor_local_keys.keys[freed_ractor_local_keys.cnt++] = key;
|
|
}
|
|
RB_VM_LOCK_LEAVE();
|
|
}
|
|
|
|
static bool
|
|
ractor_local_ref(rb_ractor_local_key_t key, void **pret)
|
|
{
|
|
if (rb_ractor_main_p()) {
|
|
if (!UNDEF_P((VALUE)key->main_cache)) {
|
|
*pret = key->main_cache;
|
|
return true;
|
|
}
|
|
else {
|
|
return false;
|
|
}
|
|
}
|
|
else {
|
|
rb_ractor_t *cr = GET_RACTOR();
|
|
|
|
if (cr->local_storage && st_lookup(cr->local_storage, (st_data_t)key, (st_data_t *)pret)) {
|
|
return true;
|
|
}
|
|
else {
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
|
|
static void
|
|
ractor_local_set(rb_ractor_local_key_t key, void *ptr)
|
|
{
|
|
rb_ractor_t *cr = GET_RACTOR();
|
|
|
|
if (cr->local_storage == NULL) {
|
|
cr->local_storage = st_init_numtable();
|
|
}
|
|
|
|
st_insert(cr->local_storage, (st_data_t)key, (st_data_t)ptr);
|
|
|
|
if (rb_ractor_main_p()) {
|
|
key->main_cache = ptr;
|
|
}
|
|
}
|
|
|
|
VALUE
|
|
rb_ractor_local_storage_value(rb_ractor_local_key_t key)
|
|
{
|
|
void *val;
|
|
if (ractor_local_ref(key, &val)) {
|
|
return (VALUE)val;
|
|
}
|
|
else {
|
|
return Qnil;
|
|
}
|
|
}
|
|
|
|
bool
|
|
rb_ractor_local_storage_value_lookup(rb_ractor_local_key_t key, VALUE *val)
|
|
{
|
|
if (ractor_local_ref(key, (void **)val)) {
|
|
return true;
|
|
}
|
|
else {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
void
|
|
rb_ractor_local_storage_value_set(rb_ractor_local_key_t key, VALUE val)
|
|
{
|
|
ractor_local_set(key, (void *)val);
|
|
}
|
|
|
|
void *
|
|
rb_ractor_local_storage_ptr(rb_ractor_local_key_t key)
|
|
{
|
|
void *ret;
|
|
if (ractor_local_ref(key, &ret)) {
|
|
return ret;
|
|
}
|
|
else {
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
void
|
|
rb_ractor_local_storage_ptr_set(rb_ractor_local_key_t key, void *ptr)
|
|
{
|
|
ractor_local_set(key, ptr);
|
|
}
|
|
|
|
#define DEFAULT_KEYS_CAPA 0x10
|
|
|
|
void
|
|
rb_ractor_finish_marking(void)
|
|
{
|
|
for (int i=0; i<freed_ractor_local_keys.cnt; i++) {
|
|
ruby_xfree(freed_ractor_local_keys.keys[i]);
|
|
}
|
|
freed_ractor_local_keys.cnt = 0;
|
|
if (freed_ractor_local_keys.capa > DEFAULT_KEYS_CAPA) {
|
|
freed_ractor_local_keys.capa = DEFAULT_KEYS_CAPA;
|
|
REALLOC_N(freed_ractor_local_keys.keys, rb_ractor_local_key_t, DEFAULT_KEYS_CAPA);
|
|
}
|
|
}
|
|
|
|
static VALUE
|
|
ractor_local_value(rb_execution_context_t *ec, VALUE self, VALUE sym)
|
|
{
|
|
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
|
|
ID id = rb_check_id(&sym);
|
|
struct rb_id_table *tbl = cr->idkey_local_storage;
|
|
VALUE val;
|
|
|
|
if (id && tbl && rb_id_table_lookup(tbl, id, &val)) {
|
|
return val;
|
|
}
|
|
else {
|
|
return Qnil;
|
|
}
|
|
}
|
|
|
|
static VALUE
|
|
ractor_local_value_set(rb_execution_context_t *ec, VALUE self, VALUE sym, VALUE val)
|
|
{
|
|
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
|
|
ID id = SYM2ID(rb_to_symbol(sym));
|
|
struct rb_id_table *tbl = cr->idkey_local_storage;
|
|
|
|
if (tbl == NULL) {
|
|
tbl = cr->idkey_local_storage = rb_id_table_create(2);
|
|
}
|
|
rb_id_table_insert(tbl, id, val);
|
|
return val;
|
|
}
|
|
|
|
#include "ractor.rbinc"
|