Add support for pthread condition variables and add a test.

This commit is contained in:
Jukka Jylänki 2014-11-29 19:54:50 +02:00
Родитель fb4b1e9fa1
Коммит d7c47b1ffe
9 изменённых файлов: 325 добавлений и 1 удалений

Просмотреть файл

@ -0,0 +1,57 @@
#include <pthread.h>
#include <time.h>
#include <errno.h>
#ifdef __EMSCRIPTEN__
#include <emscripten/threading.h>
#include <emscripten/emscripten.h>
#else
#include "futex.h"
#endif
#include "syscall.h"
static int do_wait(volatile int *addr, int val,
clockid_t clk, const struct timespec *at, int priv)
{
int r;
struct timespec to, *top=0;
if (at) {
if (at->tv_nsec >= 1000000000UL) return EINVAL;
if (clock_gettime(clk, &to)) return EINVAL;
to.tv_sec = at->tv_sec - to.tv_sec;
if ((to.tv_nsec = at->tv_nsec - to.tv_nsec) < 0) {
to.tv_sec--;
to.tv_nsec += 1000000000;
}
if (to.tv_sec < 0) return ETIMEDOUT;
top = &to;
}
#ifdef __EMSCRIPTEN__
double timeout = top->tv_sec * 1000000000.0 + top->tv_nsec;
if (timeout > 1 * 1000000000.0) timeout = 1 * 1000000000.0;
EM_ASM_INT( { Module['printErr']('Wait ' + $0 + '.') }, timeout);
r = emscripten_futex_wait((void*)addr, val, timeout);
#else
r = -__syscall_cp(SYS_futex, addr, FUTEX_WAIT, val, top);
#endif
if (r == EINTR || r == EINVAL || r == ETIMEDOUT) return r;
return 0;
}
int __timedwait(volatile int *addr, int val,
clockid_t clk, const struct timespec *at,
void (*cleanup)(void *), void *arg, int priv)
{
int r, cs;
if (!cleanup) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cs);
pthread_cleanup_push(cleanup, arg);
r = do_wait(addr, val, clk, at, priv);
pthread_cleanup_pop(0);
if (!cleanup) pthread_setcancelstate(cs, 0);
return r;
}

Просмотреть файл

@ -0,0 +1,43 @@
#include "pthread_impl.h"
int pthread_cond_broadcast(pthread_cond_t *c)
{
pthread_mutex_t *m;
if (!c->_c_waiters) return 0;
a_inc(&c->_c_seq);
/* If cond var is process-shared, simply wake all waiters. */
if (c->_c_mutex == (void *)-1) {
__wake(&c->_c_seq, -1, 0);
return 0;
}
/* Block waiters from returning so we can use the mutex. */
while (a_swap(&c->_c_lock, 1))
__wait(&c->_c_lock, &c->_c_lockwait, 1, 1);
if (!c->_c_waiters)
goto out;
m = c->_c_mutex;
/* Move waiter count to the mutex */
a_fetch_add(&m->_m_waiters, c->_c_waiters2);
c->_c_waiters2 = 0;
#ifdef __EMSCRIPTEN__
emscripten_futex_requeue(&c->_c_seq, !m->_m_type || (m->_m_lock&INT_MAX)!=pthread_self()->tid, &m->_m_lock, INT_MAX);
#else
/* Perform the futex requeue, waking one waiter unless we know
* that the calling thread holds the mutex. */
__syscall(SYS_futex, &c->_c_seq, FUTEX_REQUEUE,
!m->_m_type || (m->_m_lock&INT_MAX)!=pthread_self()->tid,
INT_MAX, &m->_m_lock);
#endif
out:
a_store(&c->_c_lock, 0);
if (c->_c_lockwait) __wake(&c->_c_lock, 1, 0);
return 0;
}

Просмотреть файл

@ -0,0 +1,13 @@
#include "pthread_impl.h"
int pthread_cond_destroy(pthread_cond_t *c)
{
int priv = c->_c_mutex != (void *)-1;
int cnt;
c->_c_destroy = 1;
if (c->_c_waiters)
__wake(&c->_c_seq, -1, priv);
while ((cnt = c->_c_waiters))
__wait(&c->_c_waiters, 0, cnt, priv);
return 0;
}

Просмотреть файл

@ -0,0 +1,11 @@
#include "pthread_impl.h"
int pthread_cond_init(pthread_cond_t *restrict c, const pthread_condattr_t *restrict a)
{
*c = (pthread_cond_t){0};
if (a) {
c->_c_clock = a->__attr & 0x7fffffff;
if (a->__attr>>31) c->_c_mutex = (void *)-1;
}
return 0;
}

Просмотреть файл

@ -0,0 +1,9 @@
#include "pthread_impl.h"
int pthread_cond_signal(pthread_cond_t *c)
{
if (!c->_c_waiters) return 0;
a_inc(&c->_c_seq);
if (c->_c_waiters) __wake(&c->_c_seq, 1, 0);
return 0;
}

Просмотреть файл

@ -0,0 +1,76 @@
#include "pthread_impl.h"
struct cm {
pthread_cond_t *c;
pthread_mutex_t *m;
};
static void unwait(pthread_cond_t *c, pthread_mutex_t *m)
{
/* Removing a waiter is non-trivial if we could be using requeue
* based broadcast signals, due to mutex access issues, etc. */
if (c->_c_mutex == (void *)-1) {
a_dec(&c->_c_waiters);
if (c->_c_destroy) __wake(&c->_c_waiters, 1, 0);
return;
}
while (a_swap(&c->_c_lock, 1))
__wait(&c->_c_lock, &c->_c_lockwait, 1, 1);
if (c->_c_waiters2) c->_c_waiters2--;
else a_dec(&m->_m_waiters);
a_store(&c->_c_lock, 0);
if (c->_c_lockwait) __wake(&c->_c_lock, 1, 1);
a_dec(&c->_c_waiters);
if (c->_c_destroy) __wake(&c->_c_waiters, 1, 1);
}
static void cleanup(void *p)
{
struct cm *cm = p;
unwait(cm->c, cm->m);
pthread_mutex_lock(cm->m);
}
int pthread_cond_timedwait(pthread_cond_t *restrict c, pthread_mutex_t *restrict m, const struct timespec *restrict ts)
{
struct cm cm = { .c=c, .m=m };
int r, e=0, seq;
if (m->_m_type && (m->_m_lock&INT_MAX) != pthread_self()->tid)
return EPERM;
if (ts && ts->tv_nsec >= 1000000000UL)
return EINVAL;
pthread_testcancel();
a_inc(&c->_c_waiters);
if (c->_c_mutex != (void *)-1) {
c->_c_mutex = m;
while (a_swap(&c->_c_lock, 1))
__wait(&c->_c_lock, &c->_c_lockwait, 1, 1);
c->_c_waiters2++;
a_store(&c->_c_lock, 0);
if (c->_c_lockwait) __wake(&c->_c_lock, 1, 1);
}
seq = c->_c_seq;
pthread_mutex_unlock(m);
do e = __timedwait(&c->_c_seq, seq, c->_c_clock, ts, cleanup, &cm, 0);
while (c->_c_seq == seq && (!e || e==EINTR));
if (e == EINTR) e = 0;
unwait(c, m);
if ((r=pthread_mutex_lock(m))) return r;
return e;
}

Просмотреть файл

@ -0,0 +1,6 @@
#include "pthread_impl.h"
int pthread_cond_wait(pthread_cond_t *restrict c, pthread_mutex_t *restrict m)
{
return pthread_cond_timedwait(c, m, 0);
}

Просмотреть файл

@ -0,0 +1,105 @@
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <emscripten/emscripten.h>
#define NUM_THREADS 3
#define TCOUNT 10
#define COUNT_LIMIT 12
int count = 0;
int thread_ids[3] = {0,1,2};
pthread_mutex_t count_mutex;
pthread_cond_t count_threshold_cv;
void *inc_count(void *t)
{
int i;
long my_id = (long)t;
for (i=0; i<TCOUNT; i++) {
pthread_mutex_lock(&count_mutex);
count++;
/*
Check the value of count and signal waiting thread when condition is
reached. Note that this occurs while mutex is locked.
*/
if (count == COUNT_LIMIT) {
pthread_cond_signal(&count_threshold_cv);
// printf("inc_count(): thread %ld, count = %d Threshold reached.\n",
// my_id, count);
EM_ASM_INT( { Module['print']('inc_count(): thread ' + $0 + ', count = ' + $1 + ', Threshold reached.'); }, my_id, count);
}
// printf("inc_count(): thread %ld, count = %d, unlocking mutex\n",
// my_id, count);
EM_ASM_INT( { Module['print']('inc_count(): thread ' + $0 + ', count = ' + $1 + ', unlocking mutex.'); }, my_id, count);
pthread_mutex_unlock(&count_mutex);
/* Do some "work" so threads can alternate on mutex lock */
//sleep(1);
}
pthread_exit(NULL);
}
void *watch_count(void *t)
{
long my_id = (long)t;
// printf("Starting watch_count(): thread %ld\n", my_id);
EM_ASM_INT( { Module['print']('Starting watch_count(): thread ' + $0); }, my_id);
/*
Lock mutex and wait for signal. Note that the pthread_cond_wait
routine will automatically and atomically unlock mutex while it waits.
Also, note that if COUNT_LIMIT is reached before this routine is run by
the waiting thread, the loop will be skipped to prevent pthread_cond_wait
from never returning.
*/
pthread_mutex_lock(&count_mutex);
while (count<COUNT_LIMIT) {
pthread_cond_wait(&count_threshold_cv, &count_mutex);
// printf("watch_count(): thread %ld Condition signal received.\n", my_id);
EM_ASM_INT( { Module['print']('watch_count(): thread ' + $0 + ' Condition signal received.'); }, my_id);
count += 125;
// printf("watch_count(): thread %ld count now = %d.\n", my_id, count);
EM_ASM_INT( { Module['print']('watch_count(): thread ' + $0 + ', count now = ' + $1); }, my_id, count);
}
pthread_mutex_unlock(&count_mutex);
pthread_exit(NULL);
}
int main (int argc, char *argv[])
{
int i, rc;
long t1=1, t2=2, t3=3;
pthread_t threads[3];
pthread_attr_t attr;
/* Initialize mutex and condition variable objects */
pthread_mutex_init(&count_mutex, NULL);
pthread_cond_init (&count_threshold_cv, NULL);
/* For portability, explicitly create threads in a joinable state */
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
pthread_create(&threads[0], &attr, watch_count, (void *)t1);
pthread_create(&threads[1], &attr, inc_count, (void *)t2);
pthread_create(&threads[2], &attr, inc_count, (void *)t3);
/* Wait for all threads to complete */
for (i=0; i<NUM_THREADS; i++) {
pthread_join(threads[i], NULL);
}
printf ("Main(): Waited on %d threads. Done.\n", NUM_THREADS);
/* Clean up and exit */
pthread_attr_destroy(&attr);
pthread_mutex_destroy(&count_mutex);
pthread_cond_destroy(&count_threshold_cv);
#ifdef REPORT_RESULT
int result = 0;
REPORT_RESULT();
#endif
pthread_exit(NULL);
}

Просмотреть файл

@ -2574,6 +2574,10 @@ window.close = function() {
for arg in [[], ['-DUSE_C_VOLATILE']]:
self.btest(path_from_root('tests', 'pthread', 'test_pthread_volatile.cpp'), expected='1', args=['-lpthread'] + arg)
# Test that basic thread creation works.
# Test thread-specific data (TLS).
def test_pthread_thread_local_storage(self):
self.btest(path_from_root('tests', 'pthread', 'test_pthread_thread_local_storage.cpp'), expected='0', args=['-lpthread'])
# Test the pthread condition variable creation and waiting.
def test_pthread_condition_variable(self):
self.btest(path_from_root('tests', 'pthread', 'test_pthread_condition_variable.cpp'), expected='0', args=['-lpthread'])