This patch introduces thread specific storage APIs
for tools which use `rb_internal_thread_event_hook` APIs.

* `rb_internal_thread_specific_key_create()` to create a tool specific
  thread local storage key and allocate the storage if not available.
* `rb_internal_thread_specific_set()` sets a data to thread and tool
  specific storage.
* `rb_internal_thread_specific_get()` gets a data in thread and tool
  specific storage.

Note that `rb_internal_thread_specific_get|set(thread_val, key)`
can be called without GVL and safe for async signal and safe for
multi-threading (native threads). So you can call it in any internal
thread event hooks. Further more you can call it from other native
threads. Of course `thread_val` should be living while accessing the
data from this function.

Note that you should not forget to clean up the set data.
This commit is contained in:
Koichi Sasada 2023-11-17 02:29:11 +09:00
Родитель 9b7a964318
Коммит 352a885a0f
5 изменённых файлов: 114 добавлений и 0 удалений

Просмотреть файл

@ -267,6 +267,46 @@ rb_internal_thread_event_hook_t *rb_internal_thread_add_event_hook(
bool rb_internal_thread_remove_event_hook( bool rb_internal_thread_remove_event_hook(
rb_internal_thread_event_hook_t * hook); rb_internal_thread_event_hook_t * hook);
typedef int rb_internal_thread_specific_key_t;
#define RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX 8
/**
* Create a key to store thread specific data.
*
* These APIs are designed for tools using
* rb_internal_thread_event_hook APIs.
*
* Note that only `RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX` keys
* can be created. raises `ThreadError` if exceeded.
*
* Usage:
* // at initialize time:
* int tool_key; // gvar
* Init_tool() {
* tool_key = rb_internal_thread_specific_key_create();
* }
*
* // at any timing:
* rb_internal_thread_pecific_set(thread, tool_key, per_thread_data);
* ...
* per_thread_data = rb_internal_thread_specific_get(thread, tool_key);
*/
rb_internal_thread_specific_key_t rb_internal_thread_specific_key_create(void);
/**
* Get thread and tool specific data.
*
* This function is async signal safe and thread safe.
*/
void *rb_internal_thread_specific_get(VALUE thread_val, rb_internal_thread_specific_key_t key);
/**
* Set thread and tool specific data.
*
* This function is async signal safe and thread safe.
*/
void rb_internal_thread_specific_set(VALUE thread_val, rb_internal_thread_specific_key_t key, void *data);
RBIMPL_SYMBOL_EXPORT_END() RBIMPL_SYMBOL_EXPORT_END()
#endif /* RUBY_THREAD_H */ #endif /* RUBY_THREAD_H */

Просмотреть файл

@ -1530,6 +1530,7 @@ module RubyVM::RJIT # :nodoc: all
scheduler: [self.VALUE, Primitive.cexpr!("OFFSETOF((*((struct rb_thread_struct *)NULL)), scheduler)")], scheduler: [self.VALUE, Primitive.cexpr!("OFFSETOF((*((struct rb_thread_struct *)NULL)), scheduler)")],
blocking: [CType::Immediate.parse("unsigned int"), Primitive.cexpr!("OFFSETOF((*((struct rb_thread_struct *)NULL)), blocking)")], blocking: [CType::Immediate.parse("unsigned int"), Primitive.cexpr!("OFFSETOF((*((struct rb_thread_struct *)NULL)), blocking)")],
name: [self.VALUE, Primitive.cexpr!("OFFSETOF((*((struct rb_thread_struct *)NULL)), name)")], name: [self.VALUE, Primitive.cexpr!("OFFSETOF((*((struct rb_thread_struct *)NULL)), name)")],
specific_storage: [CType::Pointer.new { CType::Pointer.new { CType::Immediate.parse("void") } }, Primitive.cexpr!("OFFSETOF((*((struct rb_thread_struct *)NULL)), specific_storage)")],
ext_config: [self.rb_ext_config, Primitive.cexpr!("OFFSETOF((*((struct rb_thread_struct *)NULL)), ext_config)")], ext_config: [self.rb_ext_config, Primitive.cexpr!("OFFSETOF((*((struct rb_thread_struct *)NULL)), ext_config)")],
) )
end end

Просмотреть файл

@ -149,6 +149,7 @@ NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd));
static int consume_communication_pipe(int fd); static int consume_communication_pipe(int fd);
static volatile int system_working = 1; static volatile int system_working = 1;
static rb_internal_thread_specific_key_t specific_key_count;
struct waiting_fd { struct waiting_fd {
struct ccan_list_node wfd_node; /* <=> vm.waiting_fds */ struct ccan_list_node wfd_node; /* <=> vm.waiting_fds */
@ -778,6 +779,8 @@ struct thread_create_params {
VALUE (*fn)(void *); VALUE (*fn)(void *);
}; };
static void thread_specific_storage_alloc(rb_thread_t *th);
static VALUE static VALUE
thread_create_core(VALUE thval, struct thread_create_params *params) thread_create_core(VALUE thval, struct thread_create_params *params)
{ {
@ -785,6 +788,8 @@ thread_create_core(VALUE thval, struct thread_create_params *params)
rb_thread_t *th = rb_thread_ptr(thval), *current_th = rb_ec_thread_ptr(ec); rb_thread_t *th = rb_thread_ptr(thval), *current_th = rb_ec_thread_ptr(ec);
int err; int err;
thread_specific_storage_alloc(th);
if (OBJ_FROZEN(current_th->thgroup)) { if (OBJ_FROZEN(current_th->thgroup)) {
rb_raise(rb_eThreadError, rb_raise(rb_eThreadError,
"can't start a new thread (frozen ThreadGroup)"); "can't start a new thread (frozen ThreadGroup)");
@ -5783,3 +5788,66 @@ rb_uninterruptible(VALUE (*b_proc)(VALUE), VALUE data)
RUBY_VM_CHECK_INTS(cur_th->ec); RUBY_VM_CHECK_INTS(cur_th->ec);
return ret; return ret;
} }
static void
thread_specific_storage_alloc(rb_thread_t *th)
{
VM_ASSERT(th->specific_storage == NULL);
if (UNLIKELY(specific_key_count > 0)) {
th->specific_storage = ZALLOC_N(void *, RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX);
}
}
rb_internal_thread_specific_key_t
rb_internal_thread_specific_key_create(void)
{
rb_vm_t *vm = GET_VM();
if (specific_key_count == 0 && vm->ractor.cnt > 1) {
rb_raise(rb_eThreadError, "The first rb_internal_thread_specific_key_create() is called with multiple ractors");
}
else if (specific_key_count > RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX) {
rb_raise(rb_eThreadError, "rb_internal_thread_specific_key_create() is called more than %d times", RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX);
}
else {
rb_internal_thread_specific_key_t key = specific_key_count++;
if (key == 0) {
// allocate
rb_ractor_t *cr = GET_RACTOR();
rb_thread_t *th;
ccan_list_for_each(&cr->threads.set, th, lt_node) {
thread_specific_storage_alloc(th);
}
}
return key;
}
}
// async and native thread safe.
void *
rb_internal_thread_specific_get(VALUE thread_val, rb_internal_thread_specific_key_t key)
{
rb_thread_t *th = DATA_PTR(thread_val);
VM_ASSERT(rb_thread_ptr(thread_val) == th);
VM_ASSERT(key < RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX);
VM_ASSERT(th->specific_storage);
return th->specific_storage[key];
}
// async and native thread safe.
void
rb_internal_thread_specific_set(VALUE thread_val, rb_internal_thread_specific_key_t key, void *data)
{
rb_thread_t *th = DATA_PTR(thread_val);
VM_ASSERT(rb_thread_ptr(thread_val) == th);
VM_ASSERT(key < RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX);
VM_ASSERT(th->specific_storage);
th->specific_storage[key] = data;
}

4
vm.c
Просмотреть файл

@ -3435,6 +3435,10 @@ thread_free(void *ptr)
rb_bug("thread_free: keeping_mutexes must be NULL (%p:%p)", (void *)th, (void *)th->keeping_mutexes); rb_bug("thread_free: keeping_mutexes must be NULL (%p:%p)", (void *)th, (void *)th->keeping_mutexes);
} }
if (th->specific_storage) {
ruby_xfree(th->specific_storage);
}
rb_threadptr_root_fiber_release(th); rb_threadptr_root_fiber_release(th);
if (th->vm && th->vm->ractor.main_thread == th) { if (th->vm && th->vm->ractor.main_thread == th) {

Просмотреть файл

@ -1152,6 +1152,7 @@ typedef struct rb_thread_struct {
/* misc */ /* misc */
VALUE name; VALUE name;
void **specific_storage;
struct rb_ext_config ext_config; struct rb_ext_config ext_config;
} rb_thread_t; } rb_thread_t;