diff --git a/nsprpub/pr/include/prtpool.h b/nsprpub/pr/include/prtpool.h new file mode 100644 index 00000000000..82ce2663516 --- /dev/null +++ b/nsprpub/pr/include/prtpool.h @@ -0,0 +1,86 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* + * The contents of this file are subject to the Netscape Public License + * Version 1.1 (the "NPL"); you may not use this file except in + * compliance with the NPL. You may obtain a copy of the NPL at + * http://www.mozilla.org/NPL/ + * + * Software distributed under the NPL is distributed on an "AS IS" basis, + * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the NPL + * for the specific language governing rights and limitations under the + * NPL. + * + * The Initial Developer of this code under the NPL is Netscape + * Communications Corporation. Portions created by Netscape are + * Copyright (C) 1999 Netscape Communications Corporation. All Rights + * Reserved. + */ + +#ifndef prtpool_h___ +#define prtpool_h___ + +#include "prtypes.h" +#include "prthread.h" +#include "prio.h" +#include "prerr.h" +#include "prerror.h" + +PR_BEGIN_EXTERN_C + +typedef struct PRJobIoDesc { + PRFileDesc *socket; + PRErrorCode error; + PRIntervalTime timeout; +} PRJobIoDesc; + +typedef struct PRThreadPool PRThreadPool; +typedef struct PRJob PRJob; +typedef void (PR_CALLBACK *JobFn) (void *arg); + +/* Create thread pool */ +PR_EXTERN(PRThreadPool *) +PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads, + PRSize stacksize); + +/* queue a job */ +PR_EXTERN(PRJob *) +PR_QueueJob(PRThreadPool *tpool, JobFn fn, void *arg, PRBool joinable); + +/* queue a job, when a socket is readable */ +PR_EXTERN(PRJob *) +PR_QueueJob_Read(PRThreadPool *tpool, PRJobIoDesc *iod, + JobFn fn, void * arg, PRBool joinable); + +/* queue a job, when a socket is writeable */ +PR_EXTERN(PRJob *) +PR_QueueJob_Write(PRThreadPool *tpool, PRJobIoDesc *iod, + JobFn fn, void * arg, PRBool joinable); + +/* queue a job, when a socket has a pending connection */ +PR_EXTERN(PRJob *) +PR_QueueJob_Accept(PRThreadPool *tpool, PRJobIoDesc *iod, + JobFn fn, void * arg, PRBool joinable); + +/* queue a job, when a timer exipres */ +PR_EXTERN(PRJob *) +PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout, + JobFn fn, void * arg, PRBool joinable); +/* cancel a job */ +PR_EXTERN(PRStatus) +PR_CancelJob(PRJob *job); + +/* join a job */ +PR_EXTERN(PRStatus) +PR_JoinJob(PRJob *job); + +/* shutdown pool */ +PR_EXTERN(PRStatus) +PR_ShutdownThreadPool(PRThreadPool *tpool); + +/* join pool, wait for exit of all threads */ +PR_EXTERN(PRStatus) +PR_JoinThreadPool(PRThreadPool *tpool); + +PR_END_EXTERN_C + +#endif /* prtpool_h___ */ diff --git a/nsprpub/pr/src/Makefile b/nsprpub/pr/src/Makefile index f93aba826fe..808f98aa2a6 100644 --- a/nsprpub/pr/src/Makefile +++ b/nsprpub/pr/src/Makefile @@ -189,6 +189,7 @@ OBJS = \ misc/$(OBJDIR)/prrng.$(OBJ_SUFFIX) \ misc/$(OBJDIR)/prsystem.$(OBJ_SUFFIX) \ misc/$(OBJDIR)/prthinfo.$(OBJ_SUFFIX) \ + misc/$(OBJDIR)/prtpool.$(OBJ_SUFFIX) \ misc/$(OBJDIR)/prtrace.$(OBJ_SUFFIX) \ misc/$(OBJDIR)/prtime.$(OBJ_SUFFIX) diff --git a/nsprpub/pr/src/misc/Makefile b/nsprpub/pr/src/misc/Makefile index fd3985c252a..842226dddf5 100644 --- a/nsprpub/pr/src/misc/Makefile +++ b/nsprpub/pr/src/misc/Makefile @@ -48,6 +48,7 @@ CSRCS = \ prsystem.c \ prtime.c \ prthinfo.c \ + prtpool.c \ prtrace.c \ $(NULL) diff --git a/nsprpub/pr/src/misc/prtpool.c b/nsprpub/pr/src/misc/prtpool.c new file mode 100644 index 00000000000..64aa8426781 --- /dev/null +++ b/nsprpub/pr/src/misc/prtpool.c @@ -0,0 +1,1127 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* + * The contents of this file are subject to the Netscape Public License + * Version 1.1 (the "NPL"); you may not use this file except in + * compliance with the NPL. You may obtain a copy of the NPL at + * http://www.mozilla.org/NPL/ + * + * Software distributed under the NPL is distributed on an "AS IS" basis, + * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the NPL + * for the specific language governing rights and limitations under the + * NPL. + * + * The Initial Developer of this code under the NPL is Netscape + * Communications Corporation. Portions created by Netscape are + * Copyright (C) 1999 Netscape Communications Corporation. All Rights + * Reserved. + */ + +#include "prtpool.h" +#include "nspr.h" + +/* + * Thread pools + * Thread pools create and manage threads to provide support for + * scheduling jobs onto one or more threads. + * + */ +#ifdef OPT_WINNT +#include +#endif + +/* + * worker thread + */ +typedef struct wthread { + PRCList links; + PRThread *thread; +} wthread; + +/* + * queue of timer jobs + */ +typedef struct timer_jobq { + PRCList list; + PRLock *lock; + PRCondVar *cv; + PRInt32 cnt; + PRCList wthreads; +} timer_jobq; + +/* + * queue of jobs + */ +typedef struct tp_jobq { + PRCList list; + PRInt32 cnt; + PRLock *lock; + PRCondVar *cv; + PRCList wthreads; +#ifdef OPT_WINNT + HANDLE nt_completion_port; +#endif +} tp_jobq; + +/* + * queue of IO jobs + */ +typedef struct io_jobq { + PRCList list; + PRPollDesc *pollfds; + PRInt32 npollfds; + PRJob **polljobs; + PRLock *lock; + PRInt32 cnt; + PRFileDesc *notify_fd; + PRCList wthreads; +} io_jobq; + +/* + * Threadpool + */ +struct PRThreadPool { + PRInt32 init_threads; + PRInt32 max_threads; + PRInt32 current_threads; + PRInt32 idle_threads; + PRInt32 stacksize; + tp_jobq jobq; + io_jobq ioq; + timer_jobq timerq; + PRCondVar *shutdown_cv; + PRBool shutdown; +}; + +typedef enum io_op_type + { JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type; + +typedef enum _PRJobStatus + { JOB_ON_TIMERQ, JOB_ON_IOQ, JOB_QUEUED, JOB_RUNNING, JOB_COMPLETED, + JOB_CANCELED, JOB_FREED } _PRJobStatus; + +typedef void (* Jobfn)(void *arg); + +#ifdef OPT_WINNT +typedef struct NT_notifier { + OVERLAPPED overlapped; /* must be first */ + PRJob *jobp; +} NT_notifier; +#endif + +struct PRJob { + PRCList links; /* for linking jobs */ + _PRJobStatus status; + PRBool joinable; + Jobfn job_func; + void *job_arg; + PRLock *jlock; + PRCondVar *join_cv; + PRThreadPool *tpool; /* back pointer to thread pool */ + PRJobIoDesc *iod; + io_op_type io_op; + PRInt16 io_poll_flags; + PRNetAddr *netaddr; + PRIntervalTime timeout; /* relative value */ + PRIntervalTime absolute; +#ifdef OPT_WINNT + NT_notifier nt_notifier; +#endif +}; + +#define JOB_LINKS_PTR(_qp) \ + ((PRJob *) ((char *) (_qp) - offsetof(PRJob, links))) + +#define WTHREAD_LINKS_PTR(_qp) \ + ((wthread *) ((char *) (_qp) - offsetof(wthread, links))) + +static void delete_job(PRJob *jobp); +static PRThreadPool * alloc_threadpool(); +static PRJob * alloc_job(PRBool joinable); +static void notify_ioq(PRThreadPool *tp); +static void notify_timerq(PRThreadPool *tp); + +/* + * worker thread function + */ +static void wstart(void *arg) +{ +PRThreadPool *tp = (PRThreadPool *) arg; +PRCList *head; + + /* + * execute jobs until shutdown + */ + while (!tp->shutdown) { + PRJob *jobp; +#ifdef OPT_WINNT + BOOL rv; + DWORD unused, shutdown; + LPOVERLAPPED olp; + + PR_Lock(tp->jobq.lock); + tp->idle_threads++; + PR_Unlock(tp->jobq.lock); + rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port, + &unused, &shutdown, &olp, INFINITE); + + PR_ASSERT(rv); + if (shutdown) + break; + jobp = ((NT_notifier *) olp)->jobp; + PR_Lock(tp->jobq.lock); + tp->idle_threads--; + tp->jobq.cnt--; + PR_Unlock(tp->jobq.lock); + PR_Lock(jobp->jlock); + jobp->status = JOB_RUNNING; + PR_Unlock(jobp->jlock); +#else + + PR_Lock(tp->jobq.lock); + while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) { + tp->idle_threads++; + PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT); + } + tp->idle_threads--; + if (tp->shutdown) { + PR_Unlock(tp->jobq.lock); + break; + } + head = PR_LIST_HEAD(&tp->jobq.list); + /* + * remove job from queue + */ + PR_REMOVE_AND_INIT_LINK(head); + tp->jobq.cnt--; + jobp = JOB_LINKS_PTR(head); + PR_Lock(jobp->jlock); + jobp->status = JOB_RUNNING; + PR_Unlock(jobp->jlock); + PR_Unlock(tp->jobq.lock); +#endif + + jobp->job_func(jobp->job_arg); + if (!jobp->joinable) { + delete_job(jobp); + } else { + PR_Lock(jobp->jlock); + jobp->status = JOB_COMPLETED; + PR_NotifyCondVar(jobp->join_cv); + PR_Unlock(jobp->jlock); + } + } + PR_Lock(tp->jobq.lock); + tp->current_threads--; + PR_Unlock(tp->jobq.lock); +} + +/* + * add a job to the work queue + */ +static void +add_to_jobq(PRThreadPool *tp, PRJob *jobp) +{ + /* + * add to jobq + */ +#ifdef OPT_WINNT + PR_Lock(tp->jobq.lock); + tp->jobq.cnt++; + PR_Unlock(tp->jobq.lock); + /* + * notify worker thread(s) + */ + PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0, + FALSE, &jobp->nt_notifier.overlapped); +#else + PR_Lock(tp->jobq.lock); + PR_APPEND_LINK(&jobp->links,&tp->jobq.list); + jobp->status = JOB_QUEUED; + tp->jobq.cnt++; + if ((tp->idle_threads < tp->jobq.cnt) && + (tp->current_threads < tp->max_threads)) { + PRThread *thr; + wthread *wthrp; + /* + * increment thread count and unlock the jobq lock + */ + tp->current_threads++; + PR_Unlock(tp->jobq.lock); + /* create new worker thread */ + thr = PR_CreateThread(PR_USER_THREAD, wstart, + tp, PR_PRIORITY_NORMAL, + PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize); + PR_Lock(tp->jobq.lock); + if (NULL == thr) { + tp->current_threads--; + } else { + wthrp = PR_NEWZAP(wthread); + wthrp->thread = thr; + PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads); + } + } + /* + * wakeup a worker thread + */ + PR_NotifyCondVar(tp->jobq.cv); + PR_Unlock(tp->jobq.lock); +#endif +} + +/* + * io worker thread function + */ +static void io_wstart(void *arg) +{ +PRThreadPool *tp = (PRThreadPool *) arg; +int pollfd_cnt, pollfds_used; +int rv; +PRCList *qp; +PRPollDesc *pollfds; +PRJob **polljobs; +int poll_timeout; +PRIntervalTime now; + + /* + * scan io_jobq + * construct poll list + * call PR_Poll + * for all fds, for which poll returns true, move the job to + * jobq and wakeup worker thread. + */ + while (!tp->shutdown) { + PRJob *jobp; + + pollfd_cnt = tp->ioq.cnt + 10; + if (pollfd_cnt > tp->ioq.npollfds) { + + /* + * re-allocate pollfd array if the current one is not large + * enough + */ + if (NULL != tp->ioq.pollfds) + PR_Free(tp->ioq.pollfds); + tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt * + (sizeof(PRPollDesc) + sizeof(PRJob *))); + PR_ASSERT(NULL != tp->ioq.pollfds); + /* + * array of pollfds + */ + pollfds = tp->ioq.pollfds; + tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]); + /* + * parallel array of jobs + */ + polljobs = tp->ioq.polljobs; + tp->ioq.npollfds = pollfd_cnt; + } + + pollfds_used = 0; + /* + * add the notify fd; used for unblocking io thread(s) + */ + pollfds[pollfds_used].fd = tp->ioq.notify_fd; + pollfds[pollfds_used].in_flags = PR_POLL_READ; + pollfds[pollfds_used].out_flags = 0; + polljobs[pollfds_used] = NULL; + pollfds_used++; + /* + * fill in the pollfd array + */ + PR_Lock(tp->ioq.lock); + for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = qp->next) { + jobp = JOB_LINKS_PTR(qp); + if (pollfds_used == (pollfd_cnt)) + break; + pollfds[pollfds_used].fd = jobp->iod->socket; + pollfds[pollfds_used].in_flags = jobp->io_poll_flags; + pollfds[pollfds_used].out_flags = 0; + polljobs[pollfds_used] = jobp; + + pollfds_used++; + } + if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) { + qp = tp->ioq.list.next; + jobp = JOB_LINKS_PTR(qp); + if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) + poll_timeout = PR_INTERVAL_NO_TIMEOUT; + else if (PR_INTERVAL_NO_WAIT == jobp->timeout) + poll_timeout = PR_INTERVAL_NO_WAIT; + else { + poll_timeout = jobp->absolute - PR_IntervalNow(); + if (poll_timeout <= 0) /* already timed out */ + poll_timeout = PR_INTERVAL_NO_WAIT; + } + } else { + poll_timeout = PR_INTERVAL_NO_TIMEOUT; + } + PR_Unlock(tp->ioq.lock); + + /* + * XXXX + * should retry if more jobs have been added to the queue? + * + */ + PR_ASSERT(pollfds_used <= pollfd_cnt); + rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout); + + if (tp->shutdown) { + break; + } + + if (rv > 0) { + /* + * at least one io event is set + */ + PRStatus rval_status; + PRInt32 index; + + PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd); + /* + * reset the pollable event, if notified + */ + if (pollfds[0].out_flags & PR_POLL_READ) { + rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd); + PR_ASSERT(PR_SUCCESS == rval_status); + } + + for(index = 1; index < (pollfds_used); index++) { + PRInt16 events = pollfds[index].in_flags; + PRInt16 revents = pollfds[index].out_flags; + jobp = polljobs[index]; + + if ((revents & PR_POLL_NVAL) || /* busted in all cases */ + ((events & PR_POLL_WRITE) && + (revents & PR_POLL_HUP))) { /* write op & hup */ + PR_Lock(tp->ioq.lock); + PR_REMOVE_AND_INIT_LINK(&jobp->links); + tp->ioq.cnt--; + PR_Unlock(tp->ioq.lock); + + /* set error */ + if (PR_POLL_NVAL & revents) + jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR; + else if (PR_POLL_HUP & revents) + jobp->iod->error = PR_CONNECT_RESET_ERROR; + + /* + * add to jobq + */ + add_to_jobq(tp, jobp); + } else if (revents & events) { + /* + * add to jobq + */ + PR_Lock(tp->ioq.lock); + PR_REMOVE_AND_INIT_LINK(&jobp->links); + tp->ioq.cnt--; + PR_Unlock(tp->ioq.lock); + + jobp->iod->error = 0; + add_to_jobq(tp, jobp); + } + } + } + /* + * timeout processing + */ + now = PR_IntervalNow(); + PR_Lock(tp->ioq.lock); + for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = qp->next) { + jobp = JOB_LINKS_PTR(qp); + if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) + break; + if ((PR_INTERVAL_NO_WAIT != jobp->timeout) && + ((PRInt32)(jobp->absolute - now) > 0)) + break; + PR_REMOVE_AND_INIT_LINK(&jobp->links); + tp->ioq.cnt--; + jobp->iod->error = PR_IO_TIMEOUT_ERROR; + add_to_jobq(tp, jobp); + } + PR_Unlock(tp->ioq.lock); + } +} + +/* + * timer worker thread function + */ +static void timer_wstart(void *arg) +{ +PRThreadPool *tp = (PRThreadPool *) arg; +PRCList *qp; +PRIntervalTime timeout; +PRIntervalTime now; + + /* + * call PR_WaitCondVar with minimum value of all timeouts + */ + while (!tp->shutdown) { + PRJob *jobp; + + PR_Lock(tp->timerq.lock); + if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) { + timeout = PR_INTERVAL_NO_TIMEOUT; + } else { + PRCList *qp; + + qp = tp->timerq.list.next; + jobp = JOB_LINKS_PTR(qp); + + timeout = jobp->absolute - PR_IntervalNow(); + if (timeout <= 0) + timeout = PR_INTERVAL_NO_WAIT; /* already timed out */ + } + if (PR_INTERVAL_NO_WAIT != timeout) + PR_WaitCondVar(tp->timerq.cv, timeout); + if (tp->shutdown) { + PR_Unlock(tp->timerq.lock); + break; + } + /* + * move expired-timer jobs to jobq + */ + now = PR_IntervalNow(); + while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) { + qp = tp->timerq.list.next; + jobp = JOB_LINKS_PTR(qp); + + if ((PRInt32)(jobp->absolute - now) > 0) { + break; + } + /* + * job timed out + */ + PR_REMOVE_AND_INIT_LINK(&jobp->links); + tp->timerq.cnt--; + add_to_jobq(tp, jobp); + } + PR_Unlock(tp->timerq.lock); + } +} + +static void +delete_threadpool(PRThreadPool *tp) +{ + if (NULL != tp) { + if (NULL != tp->shutdown_cv) + PR_DestroyCondVar(tp->shutdown_cv); + if (NULL != tp->jobq.cv) + PR_DestroyCondVar(tp->jobq.cv); + if (NULL != tp->jobq.lock) + PR_DestroyLock(tp->jobq.lock); +#ifdef OPT_WINNT + if (NULL != tp->jobq.nt_completion_port) + CloseHandle(tp->jobq.nt_completion_port); +#endif + /* Timer queue */ + if (NULL != tp->timerq.cv) + PR_DestroyCondVar(tp->timerq.cv); + if (NULL != tp->timerq.lock) + PR_DestroyLock(tp->timerq.lock); + + if (NULL != tp->ioq.lock) + PR_DestroyLock(tp->ioq.lock); + if (NULL != tp->ioq.pollfds) + PR_Free(tp->ioq.pollfds); + if (NULL != tp->ioq.notify_fd) + PR_DestroyPollableEvent(tp->ioq.notify_fd); + PR_Free(tp); + } + return; +} + +static PRThreadPool * +alloc_threadpool() +{ +PRThreadPool *tp; + + tp = PR_CALLOC(sizeof(*tp)); + if (NULL == tp) + goto failed; + tp->jobq.lock = PR_NewLock(); + if (NULL == tp->jobq.lock) + goto failed; + tp->jobq.cv = PR_NewCondVar(tp->jobq.lock); + if (NULL == tp->jobq.cv) + goto failed; +#ifdef OPT_WINNT + tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, + NULL, 0, 0); + if (NULL == tp->jobq.nt_completion_port) + goto failed; +#endif + + tp->ioq.lock = PR_NewLock(); + if (NULL == tp->ioq.lock) + goto failed; + + /* Timer queue */ + + tp->timerq.lock = PR_NewLock(); + if (NULL == tp->timerq.lock) + goto failed; + tp->timerq.cv = PR_NewCondVar(tp->timerq.lock); + if (NULL == tp->timerq.cv) + goto failed; + + tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock); + if (NULL == tp->shutdown_cv) + goto failed; + tp->ioq.notify_fd = PR_NewPollableEvent(); + if (NULL == tp->ioq.notify_fd) + goto failed; + return tp; +failed: + delete_threadpool(tp); + PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); + return NULL; +} + +/* Create thread pool */ +PR_IMPLEMENT(PRThreadPool *) +PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads, + PRSize stacksize) +{ +PRThreadPool *tp; +PRThread *thr; +int i; +wthread *wthrp; + + tp = alloc_threadpool(); + if (NULL == tp) + return NULL; + + tp->init_threads = initial_threads; + tp->max_threads = max_threads; + tp->stacksize = stacksize; + PR_INIT_CLIST(&tp->jobq.list); + PR_INIT_CLIST(&tp->ioq.list); + PR_INIT_CLIST(&tp->timerq.list); + PR_INIT_CLIST(&tp->jobq.wthreads); + PR_INIT_CLIST(&tp->ioq.wthreads); + PR_INIT_CLIST(&tp->timerq.wthreads); + tp->shutdown = PR_FALSE; + + PR_Lock(tp->jobq.lock); + for(i=0; i < initial_threads; ++i) { + + thr = PR_CreateThread(PR_USER_THREAD, wstart, + tp, PR_PRIORITY_NORMAL, + PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize); + PR_ASSERT(thr); + wthrp = PR_NEWZAP(wthread); + PR_ASSERT(wthrp); + wthrp->thread = thr; + PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads); + } + tp->current_threads = initial_threads; + + thr = PR_CreateThread(PR_USER_THREAD, io_wstart, + tp, PR_PRIORITY_NORMAL, + PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize); + PR_ASSERT(thr); + wthrp = PR_NEWZAP(wthread); + PR_ASSERT(wthrp); + wthrp->thread = thr; + PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads); + + thr = PR_CreateThread(PR_USER_THREAD, timer_wstart, + tp, PR_PRIORITY_NORMAL, + PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize); + PR_ASSERT(thr); + wthrp = PR_NEWZAP(wthread); + PR_ASSERT(wthrp); + wthrp->thread = thr; + PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads); + + PR_Unlock(tp->jobq.lock); + return tp; +} + +static void +delete_job(PRJob *jobp) +{ + if (NULL != jobp) { + if (NULL != jobp->jlock) { + PR_DestroyLock(jobp->jlock); + jobp->jlock = NULL; + } + if (NULL != jobp->join_cv) { + PR_DestroyCondVar(jobp->join_cv); + jobp->join_cv = NULL; + } + jobp->status = JOB_FREED; + PR_DELETE(jobp); + } +} + +static PRJob * +alloc_job(PRBool joinable) +{ + PRJob *jobp; + + jobp = PR_NEWZAP(PRJob); + if (NULL == jobp) + goto failed; + jobp->jlock = PR_NewLock(); + if (NULL == jobp->jlock) + goto failed; + if (joinable) { + jobp->join_cv = PR_NewCondVar(jobp->jlock); + if (NULL == jobp->join_cv) + goto failed; + } else { + jobp->join_cv = NULL; + } +#ifdef OPT_WINNT + jobp->nt_notifier.jobp = jobp; +#endif + return jobp; +failed: + delete_job(jobp); + PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); + return NULL; +} + +/* queue a job */ +PR_IMPLEMENT(PRJob *) +PR_QueueJob(PRThreadPool *tpool, JobFn fn, void *arg, PRBool joinable) +{ + PRJob *jobp; + + jobp = alloc_job(joinable); + if (NULL == jobp) + return NULL; + + jobp->job_func = fn; + jobp->job_arg = arg; + jobp->tpool = tpool; + jobp->joinable = joinable; + + add_to_jobq(tpool, jobp); + return jobp; +} + +/* queue a job, when a socket is readable */ +static PRJob * +queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, JobFn fn, void * arg, + PRBool joinable, io_op_type op) +{ + PRJob *jobp; + PRIntervalTime now; + + jobp = alloc_job(joinable); + if (NULL == jobp) { + return NULL; + } + + /* + * Add a new job to io_jobq + * wakeup io worker thread + */ + + jobp->job_func = fn; + jobp->job_arg = arg; + jobp->status = JOB_ON_IOQ; + jobp->tpool = tpool; + jobp->joinable = joinable; + jobp->iod = iod; + if (JOB_IO_READ == op) { + jobp->io_op = JOB_IO_READ; + jobp->io_poll_flags = PR_POLL_READ; + } else if (JOB_IO_WRITE == op) { + jobp->io_op = JOB_IO_WRITE; + jobp->io_poll_flags = PR_POLL_WRITE; + } else if (JOB_IO_ACCEPT == op) { + jobp->io_op = JOB_IO_ACCEPT; + jobp->io_poll_flags = PR_POLL_READ; + } else if (JOB_IO_CONNECT == op) { + jobp->io_op = JOB_IO_CONNECT; + jobp->io_poll_flags = PR_POLL_WRITE; + } else { + delete_job(jobp); + PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); + return NULL; + } + + jobp->timeout = iod->timeout; + if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) || + (PR_INTERVAL_NO_WAIT == iod->timeout)) { + jobp->absolute = iod->timeout; + } else { + now = PR_IntervalNow(); + jobp->absolute = now + iod->timeout; + } + + + PR_Lock(tpool->ioq.lock); + + if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) || + (PR_INTERVAL_NO_TIMEOUT == iod->timeout)) { + PR_APPEND_LINK(&jobp->links,&tpool->ioq.list); + } else if (PR_INTERVAL_NO_WAIT == iod->timeout) { + PR_INSERT_LINK(&jobp->links,&tpool->ioq.list); + } else { + PRCList *qp; + PRJob *tmp_jobp; + /* + * insert into the timeout-sorted ioq + */ + for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list; + qp = qp->prev) { + tmp_jobp = JOB_LINKS_PTR(qp); + if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) { + break; + } + } + PR_INSERT_AFTER(&jobp->links,qp); + } + + tpool->ioq.cnt++; + /* + * notify io worker thread(s) + */ + PR_Unlock(tpool->ioq.lock); + notify_ioq(tpool); + return jobp; +} + +/* queue a job, when a socket is readable */ +PR_IMPLEMENT(PRJob *) +PR_QueueJob_Read(PRThreadPool *tpool, PRJobIoDesc *iod, JobFn fn, void * arg, + PRBool joinable) +{ + return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ)); +} + +/* queue a job, when a socket is writeable */ +PR_IMPLEMENT(PRJob *) +PR_QueueJob_Write(PRThreadPool *tpool, PRJobIoDesc *iod, JobFn fn,void * arg, + PRBool joinable) +{ + return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE)); +} + + +/* queue a job, when a socket has a pending connection */ +PR_IMPLEMENT(PRJob *) +PR_QueueJob_Accept(PRThreadPool *tpool, PRJobIoDesc *iod, JobFn fn, void * arg, + PRBool joinable) +{ + return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT)); +} + +/* queue a job, when a socket can be connected */ +PR_IMPLEMENT(PRJob *) +PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod, PRNetAddr *addr, + JobFn fn, void * arg, PRBool joinable) +{ + /* + * not implemented + */ + return NULL; +} + +/* queue a job, when a timer expires */ +PR_IMPLEMENT(PRJob *) +PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout, + JobFn fn, void * arg, PRBool joinable) +{ + PRIntervalTime now; + PRJob *jobp; + + if (PR_INTERVAL_NO_TIMEOUT == timeout) { + PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); + return NULL; + } + if (PR_INTERVAL_NO_WAIT == timeout) { + /* + * no waiting; add to jobq right away + */ + return(PR_QueueJob(tpool, fn, arg, joinable)); + } + jobp = alloc_job(joinable); + if (NULL == jobp) { + return NULL; + } + + /* + * Add a new job to timer_jobq + * wakeup timer worker thread + */ + + jobp->job_func = fn; + jobp->job_arg = arg; + jobp->status = JOB_ON_TIMERQ; + jobp->tpool = tpool; + jobp->joinable = joinable; + jobp->timeout = timeout; + + now = PR_IntervalNow(); + jobp->absolute = now + timeout; + + + PR_Lock(tpool->timerq.lock); + if (PR_CLIST_IS_EMPTY(&tpool->timerq.list)) + PR_APPEND_LINK(&jobp->links,&tpool->timerq.list); + else { + PRCList *qp; + PRJob *tmp_jobp; + /* + * insert into the sorted timer jobq + */ + for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list; + qp = qp->prev) { + tmp_jobp = JOB_LINKS_PTR(qp); + if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) { + break; + } + } + PR_INSERT_AFTER(&jobp->links,qp); + } + tpool->timerq.cnt++; + /* + * notify timer worker thread(s) + */ + notify_timerq(tpool); + PR_Unlock(tpool->timerq.lock); + return jobp; +} + +static void +notify_timerq(PRThreadPool *tp) +{ + /* + * wakeup the timer thread(s) + */ + PR_NotifyCondVar(tp->timerq.cv); +} + +static void +notify_ioq(PRThreadPool *tp) +{ +PRStatus rval_status; + + /* + * wakeup the io thread(s) + */ + rval_status = PR_SetPollableEvent(tp->ioq.notify_fd); + PR_ASSERT(PR_SUCCESS == rval_status); +} + +/* + * cancel a job + * + * XXXX: is this needed? likely to be removed + */ +PR_IMPLEMENT(PRStatus) +PR_CancelJob(PRJob *jobp) { + + PRStatus rval = PR_FAILURE; + PRThreadPool *tp; + + if (JOB_QUEUED == jobp->status) { + /* + * now, check again while holding thread pool lock + */ + tp = jobp->tpool; + PR_Lock(tp->jobq.lock); + PR_Lock(jobp->jlock); + if (JOB_QUEUED == jobp->status) { + PR_REMOVE_AND_INIT_LINK(&jobp->links); + if (!jobp->joinable) { + PR_Unlock(jobp->jlock); + delete_job(jobp); + } else { + jobp->status = JOB_CANCELED; + PR_NotifyCondVar(jobp->join_cv); + PR_Unlock(jobp->jlock); + } + rval = PR_SUCCESS; + } + PR_Unlock(tp->jobq.lock); + } + return rval; +} + +/* join a job, wait until completion */ +PR_IMPLEMENT(PRStatus) +PR_JoinJob(PRJob *jobp) +{ + /* + * No references to the thread pool + */ + if (!jobp->joinable) { + PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); + return PR_FAILURE; + } + PR_Lock(jobp->jlock); + while((JOB_COMPLETED != jobp->status) && + (JOB_CANCELED != jobp->status)) + PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT); + + PR_Unlock(jobp->jlock); + delete_job(jobp); + return PR_SUCCESS; +} + +/* shutdown threadpool */ +PR_IMPLEMENT(PRStatus) +PR_ShutdownThreadPool(PRThreadPool *tpool) +{ +PRStatus rval = PR_SUCCESS; + + PR_Lock(tpool->jobq.lock); + tpool->shutdown = PR_TRUE; + PR_NotifyAllCondVar(tpool->shutdown_cv); + PR_Unlock(tpool->jobq.lock); + + return rval; +} + +/* + * join thread pool + * wait for termination of worker threads + * reclaim threadpool resources + */ +PR_IMPLEMENT(PRStatus) +PR_JoinThreadPool(PRThreadPool *tpool) +{ +PRStatus rval = PR_SUCCESS; +PRCList *head; +PRStatus rval_status; + + PR_Lock(tpool->jobq.lock); + while (!tpool->shutdown) + PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT); + + /* + * wakeup worker threads + */ +#ifdef OPT_WINNT + /* + * post shutdown notification for all threads + */ + { + int i; + for(i=0; i < tpool->current_threads; i++) { + PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0, + TRUE, NULL); + } + } +#else + PR_NotifyAllCondVar(tpool->jobq.cv); +#endif + + /* + * wakeup io thread(s) + */ + PR_Lock(tpool->ioq.lock); + notify_ioq(tpool); + PR_Unlock(tpool->ioq.lock); + + /* + * wakeup timer thread(s) + */ + PR_Lock(tpool->timerq.lock); + notify_timerq(tpool); + PR_Unlock(tpool->timerq.lock); + + while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) { + wthread *wthrp; + + head = PR_LIST_HEAD(&tpool->jobq.wthreads); + PR_REMOVE_AND_INIT_LINK(head); + PR_Unlock(tpool->jobq.lock); + wthrp = WTHREAD_LINKS_PTR(head); + rval_status = PR_JoinThread(wthrp->thread); + PR_ASSERT(PR_SUCCESS == rval_status); + PR_DELETE(wthrp); + PR_Lock(tpool->jobq.lock); + } + PR_Unlock(tpool->jobq.lock); + while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) { + wthread *wthrp; + + head = PR_LIST_HEAD(&tpool->ioq.wthreads); + PR_REMOVE_AND_INIT_LINK(head); + wthrp = WTHREAD_LINKS_PTR(head); + rval_status = PR_JoinThread(wthrp->thread); + PR_ASSERT(PR_SUCCESS == rval_status); + PR_DELETE(wthrp); + } + + while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) { + wthread *wthrp; + + head = PR_LIST_HEAD(&tpool->timerq.wthreads); + PR_REMOVE_AND_INIT_LINK(head); + wthrp = WTHREAD_LINKS_PTR(head); + rval_status = PR_JoinThread(wthrp->thread); + PR_ASSERT(PR_SUCCESS == rval_status); + PR_DELETE(wthrp); + } + + /* + * Delete unjoinable jobs; joinable jobs must be reclaimed by the user + */ + while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) { + PRJob *jobp; + + head = PR_LIST_HEAD(&tpool->jobq.list); + PR_REMOVE_AND_INIT_LINK(head); + jobp = JOB_LINKS_PTR(head); + tpool->jobq.cnt--; + + if (!jobp->joinable) { + delete_job(jobp); + } else { + PR_Lock(jobp->jlock); + jobp->status = JOB_CANCELED; + PR_NotifyCondVar(jobp->join_cv); + PR_Unlock(jobp->jlock); + } + } + + while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) { + PRJob *jobp; + + head = PR_LIST_HEAD(&tpool->ioq.list); + PR_REMOVE_AND_INIT_LINK(head); + tpool->ioq.cnt--; + jobp = JOB_LINKS_PTR(head); + if (!jobp->joinable) { + delete_job(jobp); + } else { + PR_Lock(jobp->jlock); + jobp->status = JOB_CANCELED; + PR_NotifyCondVar(jobp->join_cv); + PR_Unlock(jobp->jlock); + } + } + + while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) { + PRJob *jobp; + + head = PR_LIST_HEAD(&tpool->timerq.list); + PR_REMOVE_AND_INIT_LINK(head); + tpool->timerq.cnt--; + jobp = JOB_LINKS_PTR(head); + if (!jobp->joinable) { + delete_job(jobp); + } else { + PR_Lock(jobp->jlock); + jobp->status = JOB_CANCELED; + PR_NotifyCondVar(jobp->join_cv); + PR_Unlock(jobp->jlock); + } + } + + PR_ASSERT(0 == tpool->jobq.cnt); + PR_ASSERT(0 == tpool->ioq.cnt); + PR_ASSERT(0 == tpool->timerq.cnt); + + delete_threadpool(tpool); + return rval; +} diff --git a/nsprpub/pr/tests/Makefile b/nsprpub/pr/tests/Makefile index 1d19c9dd55d..7bf967f6b15 100644 --- a/nsprpub/pr/tests/Makefile +++ b/nsprpub/pr/tests/Makefile @@ -150,6 +150,8 @@ CSRCS = \ system.c \ testbit.c \ testfile.c \ + thrpool_server.c \ + thrpool_client.c \ threads.c \ thruput.c \ timemac.c \ diff --git a/nsprpub/pr/tests/thrpool_client.c b/nsprpub/pr/tests/thrpool_client.c new file mode 100644 index 00000000000..d06930f35c4 --- /dev/null +++ b/nsprpub/pr/tests/thrpool_client.c @@ -0,0 +1,374 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* + * The contents of this file are subject to the Netscape Public License + * Version 1.1 (the "NPL"); you may not use this file except in + * compliance with the NPL. You may obtain a copy of the NPL at + * http://www.mozilla.org/NPL/ + * + * Software distributed under the NPL is distributed on an "AS IS" basis, + * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the NPL + * for the specific language governing rights and limitations under the + * NPL. + * + * The Initial Developer of this code under the NPL is Netscape + * Communications Corporation. Portions created by Netscape are + * Copyright (C) 1999 Netscape Communications Corporation. All Rights + * Reserved. + */ + +/*********************************************************************** +** +** Name: thrpool_client.c +** +** Description: Test threadpool functionality. +** +** Modification History: +*/ +#include "primpl.h" +#include "prtpool.h" + +#include "plgetopt.h" + +#include +#include +#include +#ifdef XP_UNIX +#include +#endif +#if defined(_PR_PTHREADS) && !defined(_PR_DCETHREADS) +#include +#endif + +#ifdef WIN32 +#include +#endif + +static int _debug_on = 0; +static int server_port = -1; +static char *program_name = NULL; + +#ifdef XP_MAC +#include "prlog.h" +#include "prsem.h" +int fprintf(FILE *stream, const char *fmt, ...) +{ + PR_LogPrint(fmt); + return 0; +} +#define printf PR_LogPrint +extern void SetupMacPrintfLog(char *logFile); +#else +#include "obsolete/prsem.h" +#endif + +#ifdef XP_PC +#define mode_t int +#endif + +#define DPRINTF(arg) if (_debug_on) printf arg + +#define BUF_DATA_SIZE (2 * 1024) +#define TCP_MESG_SIZE 1024 +#define NUM_TCP_CLIENTS 10 /* for a listen queue depth of 5 */ + +#define NUM_TCP_CONNECTIONS_PER_CLIENT 10 +#define NUM_TCP_MESGS_PER_CONNECTION 10 +#define TCP_SERVER_PORT 10000 + +static PRInt32 num_tcp_clients = NUM_TCP_CLIENTS; +static PRInt32 num_tcp_connections_per_client = NUM_TCP_CONNECTIONS_PER_CLIENT; +static PRInt32 tcp_mesg_size = TCP_MESG_SIZE; +static PRInt32 num_tcp_mesgs_per_connection = NUM_TCP_MESGS_PER_CONNECTION; + +int failed_already=0; + +typedef struct buffer { + char data[BUF_DATA_SIZE]; +} buffer; + +PRNetAddr tcp_server_addr, udp_server_addr; + +typedef struct Client_Param { + PRNetAddr server_addr; + PRMonitor *exit_mon; /* monitor to signal on exit */ + PRInt32 *exit_counter; /* counter to decrement, before exit */ + PRInt32 datalen; +} Client_Param; + +/* + * readn + * read data from sockfd into buf + */ +static PRInt32 +readn(PRFileDesc *sockfd, char *buf, int len) +{ + int rem; + int bytes; + int offset = 0; + PRIntervalTime timeout = PR_INTERVAL_NO_TIMEOUT; + + for (rem=len; rem; offset += bytes, rem -= bytes) { + DPRINTF(("thread = 0x%lx: calling PR_Recv, bytes = %d\n", + PR_GetCurrentThread(), rem)); + bytes = PR_Recv(sockfd, buf + offset, rem, 0, + timeout); + DPRINTF(("thread = 0x%lx: returning from PR_Recv, bytes = %d\n", + PR_GetCurrentThread(), bytes)); + if (bytes < 0) { + return -1; + } + } + return len; +} + +/* + * writen + * write data from buf to sockfd + */ +static PRInt32 +writen(PRFileDesc *sockfd, char *buf, int len) +{ + int rem; + int bytes; + int offset = 0; + + for (rem=len; rem; offset += bytes, rem -= bytes) { + DPRINTF(("thread = 0x%lx: calling PR_Send, bytes = %d\n", + PR_GetCurrentThread(), rem)); + bytes = PR_Send(sockfd, buf + offset, rem, 0, + PR_INTERVAL_NO_TIMEOUT); + DPRINTF(("thread = 0x%lx: returning from PR_Send, bytes = %d\n", + PR_GetCurrentThread(), bytes)); + if (bytes <= 0) + return -1; + } + return len; +} + +/* + * TCP_Client + * Client job + * Connect to the server at the address specified in the argument. + * Fill in a buffer, write data to server, read it back and check + * for data corruption. + * Close the socket for server connection + */ +static void PR_CALLBACK +TCP_Client(void *arg) +{ + Client_Param *cp = (Client_Param *) arg; + PRFileDesc *sockfd; + buffer *in_buf, *out_buf; + union PRNetAddr netaddr; + PRInt32 bytes, i, j; + + + DPRINTF(("TCP client started\n")); + bytes = cp->datalen; + out_buf = PR_NEW(buffer); + if (out_buf == NULL) { + fprintf(stderr,"%s: failed to alloc buffer struct\n", program_name); + failed_already=1; + return; + } + in_buf = PR_NEW(buffer); + if (in_buf == NULL) { + fprintf(stderr,"%s: failed to alloc buffer struct\n", program_name); + failed_already=1; + return; + } + netaddr.inet.family = cp->server_addr.inet.family; + netaddr.inet.port = cp->server_addr.inet.port; + netaddr.inet.ip = cp->server_addr.inet.ip; + + for (i = 0; i < num_tcp_connections_per_client; i++) { + if ((sockfd = PR_OpenTCPSocket(PR_AF_INET)) == NULL) { + fprintf(stderr,"%s: PR_OpenTCPSocket failed\n", program_name); + failed_already=1; + return; + } + + DPRINTF(("TCP client connecting to server:%d\n", server_port)); + if (PR_Connect(sockfd, &netaddr,PR_INTERVAL_NO_TIMEOUT) < 0){ + fprintf(stderr, "PR_Connect failed: (%ld, %ld)\n", + PR_GetError(), PR_GetOSError()); + failed_already=1; + return; + } + for (j = 0; j < num_tcp_mesgs_per_connection; j++) { + /* + * fill in random data + */ + memset(out_buf->data, ((PRInt32) (&netaddr)) + i + j, bytes); + /* + * write to server + */ + if (writen(sockfd, out_buf->data, bytes) < bytes) { + fprintf(stderr,"%s: ERROR - TCP_Client:writen\n", program_name); + failed_already=1; + return; + } + /* + DPRINTF(("TCP Client [0x%lx]: out_buf = 0x%lx out_buf[0] = 0x%lx\n", + PR_GetCurrentThread(), out_buf, (*((int *) out_buf->data)))); + */ + if (readn(sockfd, in_buf->data, bytes) < bytes) { + fprintf(stderr,"%s: ERROR - TCP_Client:readn\n", program_name); + failed_already=1; + return; + } + /* + * verify the data read + */ + if (memcmp(in_buf->data, out_buf->data, bytes) != 0) { + fprintf(stderr,"%s: ERROR - data corruption\n", program_name); + failed_already=1; + return; + } + } + /* + * shutdown reads and writes + */ + if (PR_Shutdown(sockfd, PR_SHUTDOWN_BOTH) < 0) { + fprintf(stderr,"%s: ERROR - PR_Shutdown\n", program_name); + failed_already=1; + } + PR_Close(sockfd); + } + + PR_DELETE(out_buf); + PR_DELETE(in_buf); + + /* + * Decrement exit_counter and notify parent thread + */ + + PR_EnterMonitor(cp->exit_mon); + --(*cp->exit_counter); + PR_Notify(cp->exit_mon); + PR_ExitMonitor(cp->exit_mon); + DPRINTF(("TCP_Client exiting\n")); +} + +/* + * TCP_Socket_Client_Server_Test - concurrent server test + * + * Each client connects to the server and sends a chunk of data + * For each connection, server reads the data + * from the client and sends it back to the client, unmodified. + * Each client checks that data received from server is same as the + * data it sent to the server. + * + */ + +static PRInt32 +TCP_Socket_Client_Server_Test(void) +{ + int i; + Client_Param *cparamp; + PRMonitor *mon2; + PRInt32 datalen; + PRInt32 connections = 0; + PRThread *thr; + + datalen = tcp_mesg_size; + connections = 0; + + mon2 = PR_NewMonitor(); + if (mon2 == NULL) { + fprintf(stderr,"%s: PR_NewMonitor failed\n", program_name); + failed_already=1; + return -1; + } + + /* + * Start client jobs + */ + cparamp = PR_NEW(Client_Param); + if (cparamp == NULL) { + fprintf(stderr,"%s: PR_NEW failed\n", program_name); + failed_already=1; + return -1; + } + cparamp->server_addr.inet.family = PR_AF_INET; + cparamp->server_addr.inet.port = PR_htons(server_port); + cparamp->server_addr.inet.ip = PR_htonl(PR_INADDR_LOOPBACK); + cparamp->exit_mon = mon2; + cparamp->exit_counter = &connections; + cparamp->datalen = datalen; + for (i = 0; i < num_tcp_clients; i++) { + thr = PR_CreateThread(PR_USER_THREAD, TCP_Client, (void *)cparamp, + PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, PR_UNJOINABLE_THREAD, 0); + if (NULL == thr) { + fprintf(stderr,"%s: PR_CreateThread failed\n", program_name); + failed_already=1; + return -1; + } + PR_EnterMonitor(mon2); + connections++; + PR_ExitMonitor(mon2); + DPRINTF(("Created TCP client = 0x%lx\n", thr)); + } + /* Wait for client jobs to exit */ + PR_EnterMonitor(mon2); + while (0 != connections) { + PR_Wait(mon2, PR_INTERVAL_NO_TIMEOUT); + DPRINTF(("Client job count = %d\n", connections)); + } + PR_ExitMonitor(mon2); + printf("%30s","TCP_Socket_Client_Server_Test:"); + printf("%2ld Server %2ld Clients %2ld connections_per_client\n",1l, + num_tcp_clients, num_tcp_connections_per_client); + printf("%30s %2ld messages_per_connection %4ld bytes_per_message\n",":", + num_tcp_mesgs_per_connection, tcp_mesg_size); + + PR_DELETE(cparamp); + return 0; +} + +/************************************************************************/ + +int +main(int argc, char **argv) +{ + /* + * -d debug mode + */ + PLOptStatus os; + PLOptState *opt; + program_name = argv[0]; + + opt = PL_CreateOptState(argc, argv, "dp:"); + while (PL_OPT_EOL != (os = PL_GetNextOpt(opt))) + { + if (PL_OPT_BAD == os) continue; + switch (opt->option) + { + case 'd': /* debug mode */ + _debug_on = 1; + break; + case 'p': + server_port = atoi(opt->value); + break; + default: + break; + } + } + PL_DestroyOptState(opt); + + PR_Init(PR_USER_THREAD, PR_PRIORITY_NORMAL, 0); + PR_STDIO_INIT(); + +#ifdef XP_MAC + SetupMacPrintfLog("socket.log"); +#endif + PR_SetConcurrency(4); + + TCP_Socket_Client_Server_Test(); + + PR_Cleanup(); + if (failed_already) + return 1; + else + return 0; +} diff --git a/nsprpub/pr/tests/thrpool_server.c b/nsprpub/pr/tests/thrpool_server.c new file mode 100644 index 00000000000..0452a0a9b15 --- /dev/null +++ b/nsprpub/pr/tests/thrpool_server.c @@ -0,0 +1,558 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* + * The contents of this file are subject to the Netscape Public License + * Version 1.1 (the "NPL"); you may not use this file except in + * compliance with the NPL. You may obtain a copy of the NPL at + * http://www.mozilla.org/NPL/ + * + * Software distributed under the NPL is distributed on an "AS IS" basis, + * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the NPL + * for the specific language governing rights and limitations under the + * NPL. + * + * The Initial Developer of this code under the NPL is Netscape + * Communications Corporation. Portions created by Netscape are + * Copyright (C) 1999 Netscape Communications Corporation. All Rights + * Reserved. + */ + +/*********************************************************************** +** +** Name: thrpool.c +** +** Description: Test threadpool functionality. +** +** Modification History: +*/ +#include "primpl.h" +#include "prtpool.h" + +#include "plgetopt.h" + +#include +#include +#include +#ifdef XP_UNIX +#include +#endif +#if defined(_PR_PTHREADS) && !defined(_PR_DCETHREADS) +#include +#endif + +#ifdef WIN32 +#include +#endif + +static int _debug_on = 0; +static char *program_name = NULL; +static void serve_client_write(void *arg); + +#ifdef XP_MAC +#include "prlog.h" +#include "prsem.h" +int fprintf(FILE *stream, const char *fmt, ...) +{ + PR_LogPrint(fmt); + return 0; +} +#define printf PR_LogPrint +extern void SetupMacPrintfLog(char *logFile); +#else +#include "obsolete/prsem.h" +#endif + +#ifdef XP_PC +#define mode_t int +#endif + +#define DPRINTF(arg) if (_debug_on) printf arg + + +#define BUF_DATA_SIZE (2 * 1024) +#define TCP_MESG_SIZE 1024 +#define NUM_TCP_CLIENTS 10 /* for a listen queue depth of 5 */ + + +#define NUM_TCP_CONNECTIONS_PER_CLIENT 10 +#define NUM_TCP_MESGS_PER_CONNECTION 10 +#define TCP_SERVER_PORT 10000 +#define SERVER_MAX_BIND_COUNT 100 + +static PRInt32 num_tcp_clients = NUM_TCP_CLIENTS; +static PRInt32 num_tcp_connections_per_client = NUM_TCP_CONNECTIONS_PER_CLIENT; +static PRInt32 tcp_mesg_size = TCP_MESG_SIZE; +static PRInt32 num_tcp_mesgs_per_connection = NUM_TCP_MESGS_PER_CONNECTION; +static void TCP_Server_Accept(void *arg); + + +int failed_already=0; +typedef struct buffer { + char data[BUF_DATA_SIZE]; +} buffer; + + +typedef struct Server_Param { + PRJobIoDesc iod; /* socket to read from/write to */ + PRInt32 datalen; /* bytes of data transfered in each read/write */ + PRNetAddr netaddr; + PRMonitor *exit_mon; /* monitor to signal on exit */ + PRInt32 *job_counterp; /* counter to decrement, before exit */ + PRInt32 conn_counter; /* counter to decrement, before exit */ + PRThreadPool *tp; +} Server_Param; + +typedef struct Serve_Client_Param { + PRJobIoDesc iod; /* socket to read from/write to */ + PRInt32 datalen; /* bytes of data transfered in each read/write */ + PRMonitor *exit_mon; /* monitor to signal on exit */ + PRInt32 *job_counterp; /* counter to decrement, before exit */ + PRThreadPool *tp; +} Serve_Client_Param; + +typedef struct Session { + PRJobIoDesc iod; /* socket to read from/write to */ + buffer *in_buf; + PRInt32 bytes; + PRInt32 msg_num; + PRInt32 bytes_read; + PRMonitor *exit_mon; /* monitor to signal on exit */ + PRInt32 *job_counterp; /* counter to decrement, before exit */ + PRThreadPool *tp; +} Session; + +static void +serve_client_read(void *arg) +{ + Session *sp = (Session *) arg; + int rem; + int bytes; + int offset; + PRFileDesc *sockfd; + char *buf; + PRJob *jobp; + + PRIntervalTime timeout = PR_INTERVAL_NO_TIMEOUT; + + sockfd = sp->iod.socket; + buf = sp->in_buf->data; + + PR_ASSERT(sp->msg_num < num_tcp_mesgs_per_connection); + PR_ASSERT(sp->bytes_read < sp->bytes); + + offset = sp->bytes_read; + rem = sp->bytes - offset; + bytes = PR_Recv(sockfd, buf + offset, rem, 0, timeout); + if (bytes < 0) { + return; + } + sp->bytes_read += bytes; + sp->iod.timeout = PR_SecondsToInterval(60); + if (sp->bytes_read < sp->bytes) { + jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp, + PR_FALSE); + PR_ASSERT(NULL != jobp); + return; + } + PR_ASSERT(sp->bytes_read == sp->bytes); + DPRINTF(("serve_client: read complete, msg(%d) \n", sp->msg_num)); + + sp->iod.timeout = PR_SecondsToInterval(60); + jobp = PR_QueueJob_Write(sp->tp, &sp->iod, serve_client_write, sp, + PR_FALSE); + PR_ASSERT(NULL != jobp); + + return; +} + +static void +serve_client_write(void *arg) +{ + Session *sp = (Session *) arg; + int bytes; + PRFileDesc *sockfd; + char *buf; + PRJob *jobp; + + sockfd = sp->iod.socket; + buf = sp->in_buf->data; + + PR_ASSERT(sp->msg_num < num_tcp_mesgs_per_connection); + + bytes = PR_Send(sockfd, buf, sp->bytes, 0, PR_INTERVAL_NO_TIMEOUT); + PR_ASSERT(bytes == sp->bytes); + + if (bytes < 0) { + return; + } + DPRINTF(("serve_client: write complete, msg(%d) \n", sp->msg_num)); + sp->msg_num++; + if (sp->msg_num < num_tcp_mesgs_per_connection) { + sp->bytes_read = 0; + sp->iod.timeout = PR_SecondsToInterval(60); + jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp, + PR_FALSE); + PR_ASSERT(NULL != jobp); + return; + } + + DPRINTF(("serve_client: read/write complete, msg(%d) \n", sp->msg_num)); + if (PR_Shutdown(sockfd, PR_SHUTDOWN_BOTH) < 0) { + fprintf(stderr,"%s: ERROR - PR_Shutdown\n", program_name); + } + + PR_Close(sockfd); + PR_EnterMonitor(sp->exit_mon); + --(*sp->job_counterp); + PR_Notify(sp->exit_mon); + PR_ExitMonitor(sp->exit_mon); + + PR_DELETE(sp->in_buf); + PR_DELETE(sp); + + return; +} + +/* + * Serve_Client + * Thread, started by the server, for serving a client connection. + * Reads data from socket and writes it back, unmodified, and + * closes the socket + */ +static void PR_CALLBACK +Serve_Client(void *arg) +{ + Serve_Client_Param *scp = (Serve_Client_Param *) arg; + buffer *in_buf; + Session *sp; + PRJob *jobp; + + sp = PR_NEW(Session); + sp->iod = scp->iod; + + in_buf = PR_NEW(buffer); + if (in_buf == NULL) { + fprintf(stderr,"%s: failed to alloc buffer struct\n",program_name); + failed_already=1; + return; + } + + sp->in_buf = in_buf; + sp->bytes = scp->datalen; + sp->msg_num = 0; + sp->bytes_read = 0; + sp->tp = scp->tp; + sp->exit_mon = scp->exit_mon; + sp->job_counterp = scp->job_counterp; + + sp->iod.timeout = PR_SecondsToInterval(60); + jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp, + PR_FALSE); + PR_ASSERT(NULL != jobp); + PR_DELETE(scp); +} + +static int job_counter = 0; +/* + * TCP Server + * Server binds an address to a socket, starts a client process and + * listens for incoming connections. + * Each client connects to the server and sends a chunk of data + * Starts a Serve_Client job for each incoming connection, to read + * the data from the client and send it back to the client, unmodified. + * Each client checks that data received from server is same as the + * data it sent to the server. + * Finally, the threadpool is shutdown + */ +static void PR_CALLBACK +TCP_Server(void *arg) +{ + PRThreadPool *tp = (PRThreadPool *) arg; + Server_Param *sp; + PRFileDesc *sockfd; + PRNetAddr netaddr; + PRMonitor *sc_mon; + PRJob *jobp; + int i; + + /* + * Create a tcp socket + */ + if ((sockfd = PR_NewTCPSocket()) == NULL) { + fprintf(stderr,"%s: PR_NewTCPSocket failed\n", program_name); + return; + } + memset(&netaddr, 0 , sizeof(netaddr)); + netaddr.inet.family = PR_AF_INET; + netaddr.inet.port = PR_htons(TCP_SERVER_PORT); + netaddr.inet.ip = PR_htonl(PR_INADDR_ANY); + /* + * try a few times to bind server's address, if addresses are in + * use + */ + i = 0; + while (PR_Bind(sockfd, &netaddr) < 0) { + if (PR_GetError() == PR_ADDRESS_IN_USE_ERROR) { + netaddr.inet.port += 2; + if (i++ < SERVER_MAX_BIND_COUNT) + continue; + } + fprintf(stderr,"%s: ERROR - PR_Bind failed\n", program_name); + perror("PR_Bind"); + failed_already=1; + return; + } + + if (PR_Listen(sockfd, 32) < 0) { + fprintf(stderr,"%s: ERROR - PR_Listen failed\n", program_name); + failed_already=1; + return; + } + + if (PR_GetSockName(sockfd, &netaddr) < 0) { + fprintf(stderr,"%s: ERROR - PR_GetSockName failed\n", program_name); + failed_already=1; + return; + } + + DPRINTF(( + "TCP_Server: PR_BIND netaddr.inet.ip = 0x%lx, netaddr.inet.port = %d\n", + netaddr.inet.ip, netaddr.inet.port)); + + /* + * create the client process + */ + { +#define MAX_ARGS 4 + char *argv[MAX_ARGS + 1]; + int index = 0; + char port[32]; + char path[1024 + sizeof("/thrpool_client")]; + (void)getcwd(path, sizeof(path)); + (void)strcat(path, "/thrpool_client"); +#ifdef XP_PC + (void)strcat(path, ".exe"); +#endif + argv[index++] = path; + sprintf(port,"%d",PR_ntohs(netaddr.inet.port)); + if (_debug_on) + { + argv[index++] = "-d"; + argv[index++] = "-p"; + argv[index++] = port; + argv[index++] = NULL; + } else { + argv[index++] = "-p"; + argv[index++] = port; + argv[index++] = NULL; + } + PR_ASSERT(MAX_ARGS >= (index - 1)); + + DPRINTF(("creating client process %s ...\n", path)); + if (PR_FAILURE == PR_CreateProcessDetached(path, argv, NULL, NULL)) { + fprintf(stderr, + "thrpool_server: ERROR - PR_CreateProcessDetached failed\n"); + failed_already=1; + return; + } + } + + sc_mon = PR_NewMonitor(); + if (sc_mon == NULL) { + fprintf(stderr,"%s: PR_NewMonitor failed\n", program_name); + failed_already=1; + return; + } + + sp = PR_NEW(Server_Param); + if (sp == NULL) { + fprintf(stderr,"%s: PR_NEW failed\n", program_name); + failed_already=1; + return; + } + sp->iod.socket = sockfd; + sp->iod.timeout = PR_SecondsToInterval(60); + sp->datalen = tcp_mesg_size; + sp->exit_mon = sc_mon; + sp->job_counterp = &job_counter; + sp->conn_counter = 0; + sp->tp = tp; + sp->netaddr = netaddr; + + DPRINTF(("TCP_Server: Accepting connections \n")); + + jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp, + PR_FALSE); + PR_ASSERT(NULL != jobp); + return; +} + +static void +print_stats(void *arg) +{ + Server_Param *sp = (Server_Param *) arg; + PRThreadPool *tp = sp->tp; + PRInt32 counter; + PRJob *jobp; + + PR_EnterMonitor(sp->exit_mon); + counter = (*sp->job_counterp); + PR_ExitMonitor(sp->exit_mon); + + printf("PRINT_STATS: #client connections = %d\n",counter); + + + jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(500), + print_stats, sp, PR_FALSE); + + PR_ASSERT(NULL != jobp); +} + +static void +TCP_Server_Accept(void *arg) +{ + Server_Param *sp = (Server_Param *) arg; + PRThreadPool *tp = sp->tp; + Serve_Client_Param *scp; + PRFileDesc *newsockfd; + PRJob *jobp; + + if ((newsockfd = PR_Accept(sp->iod.socket, &sp->netaddr, + PR_INTERVAL_NO_TIMEOUT)) == NULL) { + fprintf(stderr,"%s: ERROR - PR_Accept failed\n", program_name); + failed_already=1; + goto exit; + } + scp = PR_NEW(Serve_Client_Param); + if (scp == NULL) { + fprintf(stderr,"%s: PR_NEW failed\n", program_name); + failed_already=1; + goto exit; + } + + /* + * Start a Serve_Client job for each incoming connection + */ + scp->iod.socket = newsockfd; + scp->iod.timeout = PR_SecondsToInterval(60); + scp->datalen = tcp_mesg_size; + scp->exit_mon = sp->exit_mon; + scp->job_counterp = sp->job_counterp; + scp->tp = sp->tp; + + PR_EnterMonitor(sp->exit_mon); + (*sp->job_counterp)++; + PR_ExitMonitor(sp->exit_mon); + jobp = PR_QueueJob(tp, Serve_Client, scp, + PR_FALSE); + + PR_ASSERT(NULL != jobp); + DPRINTF(("TCP_Server: Created Serve_Client = 0x%lx\n", jobp)); + + /* + * single-threaded update; no lock needed + */ + sp->conn_counter++; + if (sp->conn_counter < + (num_tcp_clients * num_tcp_connections_per_client)) { + jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp, + PR_FALSE); + PR_ASSERT(NULL != jobp); + return; + } + jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(500), + print_stats, sp, PR_FALSE); + + PR_ASSERT(NULL != jobp); + DPRINTF(("TCP_Server: Created print_stats timer job = 0x%lx\n", jobp)); + +exit: + PR_EnterMonitor(sp->exit_mon); + /* Wait for server jobs to finish */ + while (0 != *sp->job_counterp) { + PR_Wait(sp->exit_mon, PR_INTERVAL_NO_TIMEOUT); + DPRINTF(("TCP_Server: conn_counter = %d\n", + *sp->job_counterp)); + } + + PR_ExitMonitor(sp->exit_mon); + if (sp->iod.socket) { + PR_Close(sp->iod.socket); + } + PR_DestroyMonitor(sp->exit_mon); + printf("%30s","TCP_Socket_Client_Server_Test:"); + printf("%2ld Server %2ld Clients %2ld connections_per_client\n",1l, + num_tcp_clients, num_tcp_connections_per_client); + printf("%30s %2ld messages_per_connection %4ld bytes_per_message\n",":", + num_tcp_mesgs_per_connection, tcp_mesg_size); + + DPRINTF(("%s: calling PR_ShutdownThreadPool\n", program_name)); + PR_ShutdownThreadPool(sp->tp); + PR_DELETE(sp); +} + +/************************************************************************/ + +#define DEFAULT_INITIAL_THREADS 4 +#define DEFAULT_MAX_THREADS 100 +#define DEFAULT_STACKSIZE (512 * 1024) + +int +main(int argc, char **argv) +{ + PRInt32 initial_threads = DEFAULT_INITIAL_THREADS; + PRInt32 max_threads = DEFAULT_MAX_THREADS; + PRInt32 stacksize = DEFAULT_STACKSIZE; + PRThreadPool *tp = NULL; + PRStatus rv; + PRJob *jobp; + + /* + * -d debug mode + */ + PLOptStatus os; + PLOptState *opt; + + program_name = argv[0]; + opt = PL_CreateOptState(argc, argv, "d"); + while (PL_OPT_EOL != (os = PL_GetNextOpt(opt))) + { + if (PL_OPT_BAD == os) continue; + switch (opt->option) + { + case 'd': /* debug mode */ + _debug_on = 1; + break; + default: + break; + } + } + PL_DestroyOptState(opt); + + PR_Init(PR_USER_THREAD, PR_PRIORITY_NORMAL, 0); + PR_STDIO_INIT(); + +#ifdef XP_MAC + SetupMacPrintfLog("socket.log"); +#endif + PR_SetConcurrency(4); + + tp = PR_CreateThreadPool(initial_threads, max_threads, stacksize); + if (NULL == tp) { + printf("PR_CreateThreadPool failed\n"); + failed_already=1; + goto done; + } + jobp = PR_QueueJob(tp, TCP_Server, tp, PR_TRUE); + rv = PR_JoinJob(jobp); + PR_ASSERT(PR_SUCCESS == rv); + + DPRINTF(("%s: calling PR_JoinThreadPool\n", program_name)); + rv = PR_JoinThreadPool(tp); + PR_ASSERT(PR_SUCCESS == rv); + DPRINTF(("%s: returning from PR_JoinThreadPool\n", program_name)); + +done: + PR_Cleanup(); + if (failed_already) return 1; + else return 0; +}