/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 4 -*- * * The contents of this file are subject to the Netscape Public License * Version 1.0 (the "NPL"); you may not use this file except in * compliance with the NPL. You may obtain a copy of the NPL at * http://www.mozilla.org/NPL/ * * Software distributed under the NPL is distributed on an "AS IS" basis, * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the NPL * for the specific language governing rights and limitations under the * NPL. * * The Initial Developer of this code under the NPL is Netscape * Communications Corporation. Portions created by Netscape are * Copyright (C) 1998 Netscape Communications Corporation. All Rights * Reserved. */ #include "nsIFileTransportService.h" #include "nsIStreamListener.h" #include "nsIServiceManager.h" #include "nsIInputStream.h" #include "nsIThread.h" #include "nsIEventQueue.h" #include "nsIEventQueueService.h" #include "prinrval.h" #include "prmon.h" #include "prcmon.h" #include "prio.h" #include "nsIFileStream.h" #include "nsFileSpec.h" #include "nsIBuffer.h" #include "nsIBufferInputStream.h" #include "nsIThread.h" #include "nsISupportsArray.h" #include "nsIChannel.h" #include static NS_DEFINE_IID(kISupportsIID, NS_ISUPPORTS_IID); static NS_DEFINE_CID(kFileTransportServiceCID, NS_FILETRANSPORTSERVICE_CID); static NS_DEFINE_CID(kEventQueueServiceCID, NS_EVENTQUEUESERVICE_CID); PRIntervalTime gDuration = 0; PRUint32 gVolume = 0; class nsReader : public nsIRunnable, public nsIStreamListener { public: NS_DECL_ISUPPORTS NS_IMETHOD Run() { printf("waiting\n"); if (!mMonitor) return NS_ERROR_OUT_OF_MEMORY; PR_EnterMonitor(mMonitor); if (mEventQueue == nsnull) PR_CWait(this, PR_INTERVAL_NO_TIMEOUT); PR_ExitMonitor(mMonitor); printf("running\n"); mEventQueue->EventLoop(); printf("quitting\n"); return NS_OK; } nsReader() : mEventQueue(nsnull), mStartTime(0), mThread(nsnull) { NS_INIT_REFCNT(); mMonitor = PR_NewMonitor(); } virtual ~nsReader() { NS_IF_RELEASE(mThread); NS_IF_RELEASE(mEventQueue); PR_DestroyMonitor(mMonitor); } nsresult Init(nsIThread* thread) { nsresult rv; mThread = thread; NS_ADDREF(mThread); PRThread* prthread; thread->GetPRThread(&prthread); PR_EnterMonitor(mMonitor); NS_WITH_SERVICE(nsIEventQueueService, eventQService, kEventQueueServiceCID, &rv); if (NS_SUCCEEDED(rv)) { rv = eventQService->CreateThreadEventQueue(); if (NS_FAILED(rv)) return rv; rv = eventQService->GetThreadEventQueue(PR_CurrentThread(), &mEventQueue); } if (NS_FAILED(rv)) return rv; // wake up event loop PR_Notify(mMonitor); PR_ExitMonitor(mMonitor); return NS_OK; } nsIEventQueue* GetEventQueue() { return mEventQueue; } NS_IMETHOD OnStartBinding(nsISupports* context) { PR_EnterMonitor(mMonitor); printf("start binding\n"); mStartTime = PR_IntervalNow(); PR_ExitMonitor(mMonitor); return NS_OK; } NS_IMETHOD OnDataAvailable(nsISupports* context, nsIBufferInputStream *aIStream, PRUint32 aSourceOffset, PRUint32 aLength) { PR_EnterMonitor(mMonitor); char buf[1025]; while (aLength > 0) { PRUint32 amt; nsresult rv = aIStream->Read(buf, 1024, &amt); buf[amt] = '\0'; printf(buf); aLength -= amt; gVolume += amt; } PR_ExitMonitor(mMonitor); return NS_OK; } NS_IMETHOD OnStopBinding(nsISupports* context, nsresult aStatus, const PRUnichar* aMsg) { nsresult rv; PR_EnterMonitor(mMonitor); PRIntervalTime endTime = PR_IntervalNow(); gDuration += (endTime - mStartTime); printf("stop binding, %d\n", aStatus); PR_ExitMonitor(mMonitor); // get me out of my event loop rv = mThread->Interrupt(); if (NS_FAILED(rv)) return rv; return rv; } NS_IMETHOD OnStartRequest(nsISupports* context) { return NS_ERROR_NOT_IMPLEMENTED; } NS_IMETHOD OnStopRequest(nsISupports* context, nsresult aStatus, const PRUnichar* aMsg) { return NS_ERROR_NOT_IMPLEMENTED; } protected: nsIEventQueue* mEventQueue; PRIntervalTime mStartTime; nsIThread* mThread; private: PRMonitor* mMonitor; }; NS_IMPL_ADDREF(nsReader); NS_IMPL_RELEASE(nsReader); NS_IMETHODIMP nsReader::QueryInterface(const nsIID& aIID, void* *aInstancePtr) { if (NULL == aInstancePtr) { return NS_ERROR_NULL_POINTER; } if (aIID.Equals(nsIRunnable::GetIID()) || aIID.Equals(kISupportsIID)) { *aInstancePtr = NS_STATIC_CAST(nsIRunnable*, this); NS_ADDREF_THIS(); return NS_OK; } if (aIID.Equals(nsIStreamListener::GetIID())) { *aInstancePtr = NS_STATIC_CAST(nsIStreamListener*, this); NS_ADDREF_THIS(); return NS_OK; } return NS_NOINTERFACE; } nsresult Simulated_nsFileTransport_Run(nsReader* reader, const char* path) { // duplicate the work of nsFileTransport::Run here #define NS_FILE_TRANSPORT_BUFFER_SIZE (4*1024) nsresult rv; nsISupports* fs; nsIInputStream* fileStr = nsnull; nsIBufferInputStream* bufStr = nsnull; nsFileSpec spec(path); PRUint32 sourceOffset = 0; rv = reader->OnStartBinding(nsnull); if (NS_FAILED(rv)) goto done; // XXX should this abort the transfer? rv = NS_NewTypicalInputFileStream(&fs, spec); if (NS_FAILED(rv)) goto done; rv = fs->QueryInterface(nsIInputStream::GetIID(), (void**)&fileStr); NS_RELEASE(fs); if (NS_FAILED(rv)) goto done; nsIBuffer* buf; rv = NS_NewBuffer(&buf, NS_FILE_TRANSPORT_BUFFER_SIZE, NS_FILE_TRANSPORT_BUFFER_SIZE, nsnull); rv = NS_NewBufferInputStream(&bufStr, buf); if (NS_FAILED(rv)) goto done; /* if ( spec.GetFileSize() == 0) goto done; */ while (PR_TRUE) { PRUint32 amt; /* id'l change to FillFrom... */ rv = bufStr->FillFrom(fileStr, spec.GetFileSize(), &amt); if (rv == NS_BASE_STREAM_EOF) { rv = NS_OK; break; } if (NS_FAILED(rv)) break; rv = reader->OnDataAvailable(nsnull, bufStr, sourceOffset, amt); if (NS_FAILED(rv)) break; sourceOffset += amt; } done: NS_IF_RELEASE(bufStr); NS_IF_RELEASE(fileStr); rv = reader->OnStopBinding(nsnull, rv, nsnull); return rv; } void SerialReadTest(char* dirName) { nsresult rv; PRStatus status; PRDir* dir = PR_OpenDir(dirName); NS_ASSERTION(dir, "bad dir"); nsISupportsArray* threads; rv = NS_NewISupportsArray(&threads); NS_ASSERTION(NS_SUCCEEDED(rv), "NS_NewISupportsArray failed"); PRIntervalTime startTime = PR_IntervalNow(); PRDirEntry* entry; while ((entry = PR_ReadDir(dir, PR_SKIP_BOTH)) != nsnull) { nsFileSpec spec(dirName); spec += entry->name; nsReader* reader = new nsReader(); NS_ASSERTION(reader, "out of memory"); NS_ADDREF(reader); nsIThread* readerThread; rv = NS_NewThread(&readerThread, reader); NS_ASSERTION(NS_SUCCEEDED(rv), "new thread failed"); rv = reader->Init(readerThread); NS_ASSERTION(NS_SUCCEEDED(rv), "init failed"); nsIStreamListener* listener; reader->QueryInterface(nsIStreamListener::GetIID(), (void**)&listener); NS_ASSERTION(listener, "QI failed"); rv = Simulated_nsFileTransport_Run(reader, spec); NS_ASSERTION(NS_SUCCEEDED(rv), "Simulated_nsFileTransport_Run failed"); // the reader thread will hang on to these objects until it quits NS_RELEASE(listener); NS_RELEASE(readerThread); NS_RELEASE(reader); } PRIntervalTime endTime = PR_IntervalNow(); printf("duration %d ms, volume %d\n", PR_IntervalToMilliseconds(endTime - startTime), gVolume); gVolume = 0; // now that we've forked all the async requests, wait until they're done PRUint32 threadCount; rv = threads->Count(&threadCount); for (PRUint32 i = 0; i < threadCount; i++) { nsIThread* thread = (nsIThread*)threads->ElementAt(i); thread->Join(); NS_RELEASE(thread); } NS_RELEASE(threads); status = PR_CloseDir(dir); NS_ASSERTION(status == PR_SUCCESS, "can't close dir"); } void ParallelReadTest(char* dirName, nsIFileTransportService* fts) { nsresult rv; PRStatus status; PRDir* dir = PR_OpenDir(dirName); NS_ASSERTION(dir, "bad dir"); nsISupportsArray* threads; rv = NS_NewISupportsArray(&threads); NS_ASSERTION(NS_SUCCEEDED(rv), "NS_NewISupportsArray failed"); PRDirEntry* entry; while ((entry = PR_ReadDir(dir, PR_SKIP_BOTH)) != nsnull) { nsFileSpec spec(dirName); spec += entry->name; nsReader* reader = new nsReader(); NS_ASSERTION(reader, "out of memory"); NS_ADDREF(reader); nsIThread* readerThread; rv = NS_NewThread(&readerThread, reader); NS_ASSERTION(NS_SUCCEEDED(rv), "new thread failed"); rv = reader->Init(readerThread); NS_ASSERTION(NS_SUCCEEDED(rv), "init failed"); nsIStreamListener* listener; reader->QueryInterface(nsIStreamListener::GetIID(), (void**)&listener); NS_ASSERTION(listener, "QI failed"); nsIChannel* trans; rv = fts->CreateTransport(spec, &trans); NS_ASSERTION(NS_SUCCEEDED(rv), "create failed"); rv = trans->AsyncRead(0, -1, nsnull, reader->GetEventQueue(), listener); NS_ASSERTION(NS_SUCCEEDED(rv), "AsyncRead failed"); // the reader thread will hang on to these objects until it quits NS_RELEASE(trans); NS_RELEASE(listener); NS_RELEASE(reader); threads->AppendElement(readerThread); NS_RELEASE(readerThread); } // now that we've forked all the async requests, wait until they're done PRUint32 threadCount; rv = threads->Count(&threadCount); for (PRUint32 i = 0; i < threadCount; i++) { nsIThread* thread = (nsIThread*)threads->ElementAt(i); thread->Join(); NS_RELEASE(thread); } NS_RELEASE(threads); status = PR_CloseDir(dir); NS_ASSERTION(status == PR_SUCCESS, "can't close dir"); } int main(int argc, char* argv[]) { nsresult rv; if (argc < 2) { printf("usage: %s \n", argv[0]); return -1; } char* dirName = argv[1]; // XXX why do I have to do this?! rv = nsComponentManager::AutoRegister(nsIComponentManager::NS_Startup, "components"); if (NS_FAILED(rv)) return rv; NS_WITH_SERVICE(nsIFileTransportService, fts, kFileTransportServiceCID, &rv); if (NS_FAILED(rv)) return rv; SerialReadTest(dirName); ParallelReadTest(dirName, fts); fts->ProcessPendingRequests(); printf("duration %d ms, volume %d\n", PR_IntervalToMilliseconds(gDuration), gVolume); gVolume = 0; return 0; }