This commit is contained in:
dougt%netscape.com 2000-10-01 05:35:03 +00:00
Родитель 44ba6115cb
Коммит 25f36f554d
2 изменённых файлов: 148 добавлений и 114 удалений

Просмотреть файл

@ -357,9 +357,14 @@ nsThreadPool::~nsThreadPool()
Shutdown();
}
if (mRequestMonitor) {
PR_DestroyMonitor(mRequestMonitor);
}
if (mLock)
PR_DestroyLock(mLock);
if (mThreadExit)
PR_DestroyCondVar(mThreadExit);
if (mRequestAdded)
PR_DestroyCondVar(mRequestAdded);
if (mRequestsAtZero)
PR_DestroyCondVar(mRequestsAtZero);
}
NS_IMPL_THREADSAFE_ISUPPORTS1(nsThreadPool, nsIThreadPool)
@ -368,7 +373,12 @@ NS_IMETHODIMP
nsThreadPool::DispatchRequest(nsIRunnable* runnable)
{
nsresult rv;
nsAutoMonitor mon(mRequestMonitor);
nsAutoLock lock(mLock);
#if defined(PR_LOGGING)
nsCOMPtr<nsIThread> th;
nsIThread::GetCurrent(getter_AddRefs(th));
#endif
NS_ASSERTION(mMinThreads > 0, "forgot to call Init");
if (mShuttingDown) {
@ -382,8 +392,11 @@ nsThreadPool::DispatchRequest(nsIRunnable* runnable)
rv = mThreads->Count(&threadCount);
if (NS_FAILED(rv)) goto exit;
if ((requestCnt >= threadCount) && (threadCount < mMaxThreads)) {
PR_LOG(nsIThreadLog, PR_LOG_DEBUG,
("nsIThreadPool thread %p: %d threads in pool, max = %d, requests = %d, creating new thread...\n",
th.get(), threadCount, mMaxThreads, requestCnt));
rv = AddThread();
if (NS_FAILED(rv)) goto exit;
}
@ -391,18 +404,27 @@ nsThreadPool::DispatchRequest(nsIRunnable* runnable)
// XXX for now AppendElement returns a PRBool
rv = ((PRBool) mRequests->AppendElement(runnable)) ? NS_OK : NS_ERROR_FAILURE;
if (NS_SUCCEEDED(rv)) {
rv = mon.Notify();
if (NS_FAILED(rv)) goto exit;
if (PR_FAILURE == PR_NotifyCondVar(mRequestAdded))
goto exit;
}
}
exit:
#if defined(PR_LOGGING)
nsCOMPtr<nsIThread> th;
nsIThread::GetCurrent(getter_AddRefs(th));
PR_LOG(nsIThreadLog, PR_LOG_DEBUG,
("nsIThreadPool thread %p dispatched %p status %x\n", th.get(), runnable, rv));
#endif
return rv;
}
nsresult
nsThreadPool::RemoveThread(nsIThread* currentThread)
{
PR_LOG(nsIThreadLog, PR_LOG_DEBUG,
("nsIThreadPool thread %p being removed\n",
currentThread));
nsresult rv = mThreads->DeleteLastElement(currentThread);
PR_NotifyCondVar(mThreadExit);
return rv;
}
@ -411,71 +433,66 @@ nsThreadPool::GetRequest(nsIThread* currentThread)
{
nsresult rv = NS_OK;
nsIRunnable* request = nsnull;
nsAutoMonitor mon(mRequestMonitor);
nsAutoLock lock(mLock);
PRUint32 cnt;
PRUint32 requestCnt;
while (PR_TRUE) {
rv = mRequests->Count(&cnt);
if (NS_FAILED(rv) || cnt != 0)
break;
if (mShuttingDown) {
rv = NS_ERROR_FAILURE;
break;
requestCnt = 0;
rv = mRequests->Count(&requestCnt);
if (NS_FAILED(rv)) {
return nsnull;
}
if (requestCnt > 0) {
request = (nsIRunnable*)mRequests->ElementAt(0);
NS_ASSERTION(request != nsnull, "null runnable");
PRBool removed = mRequests->RemoveElementAt(0);
NS_ASSERTION(removed, "nsISupportsArray broken");
PR_LOG(nsIThreadLog, PR_LOG_DEBUG,
("nsIThreadPool thread %p got request %p\n",
currentThread, request));
if (removed && requestCnt == 1)
PR_NotifyCondVar(mRequestsAtZero);
PR_LOG(nsIThreadLog, PR_LOG_DEBUG,
("nsIThreadPool thread %p got request %p\n",
currentThread, request));
return request;
}
// no requests, and we're not shutting down yet...
// if we have more than the minimum required threads already then
// we can just go away
PRUint32 threadCnt;
rv = mThreads->Count(&threadCnt);
if (NS_FAILED(rv)) break;
#if 0 /* XXX I had to take this code out to cut the number of threads back to
* the minimum count because if you just let the threads terminate
* themselves without joining with them, they'll just end up hanging around
* anyway. So we might as well keep them on the active list. Fix later!
*/
if (threadCnt > mMinThreads) {
PR_LOG(nsIThreadLog, PR_LOG_DEBUG,
("nsIThreadPool thread %p being removed (%d threads left)\n",
currentThread.get(), threadCnt - 1));
rv = mThreads->RemoveElement(currentThread) ? NS_OK : NS_ERROR_FAILURE; // XXX fix result
if (NS_FAILED(rv)) break;
// release the thread once more because the thread pool isn't
// going to join with it now:
nsIThread* current = currentThread;
NS_RELEASE(current);
return nsnull; // causes nsThreadPoolRunnable::Run to quit
("nsIThreadPool thread %p: %d threads in pool, min = %d, exiting...\n",
currentThread, threadCnt, mMinThreads));
RemoveThread(currentThread);
return nsnull; // causes nsThreadPoolRunnable::Run to quit
}
#endif
PR_LOG(nsIThreadLog, PR_LOG_DEBUG,
("nsIThreadPool thread %p waiting (%d threads in pool)\n",
currentThread, threadCnt));
rv = mon.Wait();
if (NS_FAILED(rv) || mShuttingDown) {
rv = NS_ERROR_FAILURE;
if (mShuttingDown)
break;
}
}
if (NS_SUCCEEDED(rv)) {
NS_ASSERTION((NS_SUCCEEDED(mRequests->Count(&cnt)) && cnt > 0),
"request queue out of sync");
request = (nsIRunnable*)mRequests->ElementAt(0);
NS_ASSERTION(request != nsnull, "null runnable");
PRBool removed = mRequests->RemoveElementAt(0);
NS_ASSERTION(removed, "nsISupportsArray broken");
(void)PR_WaitCondVar(mRequestAdded, PR_INTERVAL_NO_TIMEOUT);
}
// no requests, we are going to dump the thread.
PR_LOG(nsIThreadLog, PR_LOG_DEBUG,
("nsIThreadPool thread %p got request %p\n",
currentThread, request));
return request;
("nsIThreadPool thread %p -- no more requests, exiting...\n",
currentThread));
RemoveThread(currentThread);
return nsnull;
}
NS_METHOD
@ -492,18 +509,12 @@ NS_IMETHODIMP
nsThreadPool::ProcessPendingRequests()
{
nsresult rv;
nsAutoCMonitor mon(this);
while (PR_TRUE) {
PRUint32 cnt;
rv = mRequests->Count(&cnt);
if (NS_FAILED(rv) || cnt == 0)
break;
rv = mon.Wait();
if (NS_FAILED(rv)) { // our thread was interrupted!
break;
}
(void)PR_WaitCondVar(mRequestsAtZero, PR_INTERVAL_NO_TIMEOUT);
}
#ifdef DEBUG
PRUint32 requestCount;
@ -513,12 +524,20 @@ nsThreadPool::ProcessPendingRequests()
return rv;
}
PRBool
nsThreadPool::InterruptThreads(nsISupports* aElement, void *aData)
{
nsCOMPtr<nsIThread> thread = do_QueryInterface(aElement);
NS_ASSERTION(thread, "bad thread in array");
(void) thread->Interrupt();
return PR_TRUE;
}
NS_IMETHODIMP
nsThreadPool::Shutdown()
{
nsresult rv = NS_OK;
PRUint32 count = 0;
PRUint32 i;
#if defined(PR_LOGGING)
nsCOMPtr<nsIThread> th;
@ -526,32 +545,30 @@ nsThreadPool::Shutdown()
#endif
PR_LOG(nsIThreadLog, PR_LOG_DEBUG,
("nsIThreadPool thread %p shutting down\n", th.get()));
nsAutoLock lock(mLock);
mShuttingDown = PR_TRUE;
rv = ProcessPendingRequests();
NS_ASSERTION(NS_SUCCEEDED(rv), "ProcessPendingRequests failed");
// keep trying... don't bail with an error here
// then interrupt the threads and join them
rv = mThreads->Count(&count);
NS_ASSERTION(NS_SUCCEEDED(rv), "Count failed");
// fix Add assert that there are no more requests to be handled.
// then interrupt the threads
rv = mThreads->EnumerateForwards(nsThreadPool::InterruptThreads, nsnull);
NS_ASSERTION(NS_SUCCEEDED(rv), "Interruption failed");
if (NS_FAILED(rv)) return rv;
for (i = 0; i < count; i++) {
nsIThread* thread = (nsIThread*)(mThreads->ElementAt(0));
// we don't care about the error from Interrupt, because the
// thread may have already terminated its event loop
(void)thread->Interrupt();
rv = thread->Join();
// don't break out of the loop because of an error here
NS_ASSERTION(NS_SUCCEEDED(rv), "Join failed");
NS_RELEASE(thread);
rv = mThreads->RemoveElementAt(0);
// don't break out of the loop because of an error here
NS_ASSERTION(NS_SUCCEEDED(rv), "RemoveElementAt failed");
}
while (PR_TRUE) {
rv = mThreads->Count(&count);
NS_ASSERTION(NS_SUCCEEDED(rv), "Count failed");
if (NS_FAILED(rv)) return rv;
if (count == 0 )
break;
PR_WaitCondVar(mThreadExit, PR_INTERVAL_NO_TIMEOUT);
}
mThreads = nsnull;
return rv;
}
@ -578,11 +595,35 @@ nsThreadPool::Init(PRUint32 minThreadCount,
rv = NS_NewISupportsArray(getter_AddRefs(mRequests));
if (NS_FAILED(rv)) return rv;
mRequestMonitor = PR_NewMonitor();
if (mRequestMonitor == nsnull)
return NS_ERROR_OUT_OF_MEMORY;
mLock = PR_NewLock();
if (mLock == nsnull)
goto cleanup;
return rv;
mRequestAdded = PR_NewCondVar(mLock);
if (mRequestAdded == nsnull)
goto cleanup;
mThreadExit = PR_NewCondVar(mLock);
if (mThreadExit == nsnull)
goto cleanup;
mRequestsAtZero = PR_NewCondVar(mLock);
if (mRequestsAtZero == nsnull)
goto cleanup;
return NS_OK;
cleanup:
if (mLock)
PR_DestroyLock(mLock);
if (mThreadExit)
PR_DestroyCondVar(mThreadExit);
if (mRequestAdded)
PR_DestroyCondVar(mRequestAdded);
if (mRequestsAtZero)
PR_DestroyCondVar(mRequestsAtZero);
return NS_ERROR_OUT_OF_MEMORY;
}
@ -590,7 +631,6 @@ nsresult
nsThreadPool::AddThread()
{
nsresult rv;
nsAutoCMonitor mon(this);
#ifdef DEBUG
PRUint32 cnt;
@ -606,27 +646,24 @@ nsThreadPool::AddThread()
return NS_ERROR_OUT_OF_MEMORY;
NS_ADDREF(runnable);
nsIThread* thread;
rv = NS_NewThread(&thread,
nsCOMPtr<nsIThread> thread;
rv = NS_NewThread(getter_AddRefs(thread),
runnable,
mStackSize,
PR_JOINABLE_THREAD, /* needed for Shutdown */
PR_UNJOINABLE_THREAD,
mPriority,
mScope);
// Let the thread own the runnable.
NS_RELEASE(runnable);
if (NS_FAILED(rv)) return rv;
PR_LOG(nsIThreadLog, PR_LOG_DEBUG,
("nsIThreadPool adding new thread %p (%d total)\n",
thread, cnt + 1));
thread.get(), cnt + 1));
// wait for worker thread to be ready
rv = mon.Wait();
if (NS_SUCCEEDED(rv))
rv = mThreads->AppendElement(thread) ? NS_OK : NS_ERROR_FAILURE;
NS_RELEASE(thread);
rv = mThreads->AppendElement(thread) ? NS_OK : NS_ERROR_FAILURE;
return rv;
}
@ -673,12 +710,6 @@ nsThreadPoolRunnable::Run()
nsresult rv = NS_OK;
nsIRunnable* request;
// let the thread pool know we're ready
{
nsAutoCMonitor mon(mPool);
mon.Notify();
}
nsCOMPtr<nsIThread> currentThread;
nsIThread::GetCurrent(getter_AddRefs(currentThread));
@ -689,11 +720,6 @@ nsThreadPoolRunnable::Run()
rv = request->Run();
NS_ASSERTION(NS_SUCCEEDED(rv), "runnable failed");
// let the thread pool know we've finished a run
{
nsAutoCMonitor mon(mPool);
mon.Notify();
}
PR_LOG(nsIThreadLog, PR_LOG_DEBUG,
("nsIThreadPool thread %p completed %p status=%x\n",
currentThread.get(), request, rv));

Просмотреть файл

@ -37,7 +37,7 @@
#include "nsIThread.h"
#include "nsIThreadPool.h"
#include "nsISupportsArray.h"
#include "prcmon.h"
#include "prcvar.h"
#include "nsCOMPtr.h"
class nsThread : public nsIThread
@ -86,6 +86,9 @@ public:
nsIRunnable* GetRequest(nsIThread* currentThread);
nsresult AddThread();
nsresult RemoveThread(nsIThread* currentThread);
static PRBool InterruptThreads(nsISupports* aElement,
void *aData);
static NS_METHOD
Create(nsISupports* outer, const nsIID& aIID, void* *aInstancePtr);
@ -93,7 +96,12 @@ public:
protected:
nsCOMPtr<nsISupportsArray> mThreads;
nsCOMPtr<nsISupportsArray> mRequests;
PRMonitor* mRequestMonitor;
PRLock* mLock;
PRCondVar* mThreadExit;
PRCondVar* mRequestAdded;
PRCondVar* mRequestsAtZero;
PRUint32 mStackSize;
PRThreadPriority mPriority;